http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 614ee28..0ee33b3 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -39,10 +39,7 @@ namespace minifi {
 
 std::atomic<uint64_t> FlowFileRecord::local_flow_seq_number_(0);
 
-FlowFileRecord::FlowFileRecord(
-    std::shared_ptr<core::Repository> flow_repository,
-    std::map<std::string, std::string> attributes,
-    std::shared_ptr<ResourceClaim> claim)
+FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> 
flow_repository, std::map<std::string, std::string> attributes, 
std::shared_ptr<ResourceClaim> claim)
     : FlowFile(),
       flow_repository_(flow_repository),
       logger_(logging::LoggerFactory<FlowFileRecord>::getLogger()) {
@@ -68,9 +65,7 @@ FlowFileRecord::FlowFileRecord(
     claim_->increaseFlowFileRecordOwnedCount();
 }
 
-FlowFileRecord::FlowFileRecord(
-    std::shared_ptr<core::Repository> flow_repository,
-    std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection)
+FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> 
flow_repository, std::shared_ptr<core::FlowFile> &event, const std::string 
&uuidConnection)
     : FlowFile(),
       snapshot_(""),
       flow_repository_(flow_repository),
@@ -89,9 +84,7 @@ FlowFileRecord::FlowFileRecord(
   }
 }
 
-FlowFileRecord::FlowFileRecord(
-    std::shared_ptr<core::Repository> flow_repository,
-    std::shared_ptr<core::FlowFile> &event)
+FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> 
flow_repository, std::shared_ptr<core::FlowFile> &event)
     : FlowFile(),
       uuid_connection_(""),
       snapshot_(""),
@@ -109,8 +102,7 @@ FlowFileRecord::~FlowFileRecord() {
     claim_->decreaseFlowFileRecordOwnedCount();
     std::string value;
     if (claim_->getFlowFileRecordOwnedCount() <= 0) {
-      logger_->log_debug("Delete Resource Claim %s",
-                         claim_->getContentFullPath().c_str());
+      logger_->log_debug("Delete Resource Claim %s", 
claim_->getContentFullPath().c_str());
       if (!this->stored || !flow_repository_->Get(uuid_str_, value)) {
         std::remove(claim_->getContentFullPath().c_str());
       }
@@ -138,8 +130,7 @@ bool FlowFileRecord::removeKeyedAttribute(FlowAttribute 
key) {
   }
 }
 
-bool FlowFileRecord::updateKeyedAttribute(FlowAttribute key,
-                                          std::string value) {
+bool FlowFileRecord::updateKeyedAttribute(FlowAttribute key, std::string 
value) {
   const char *keyStr = FlowAttributeKey(key);
   if (keyStr) {
     std::string keyString = keyStr;
@@ -174,25 +165,19 @@ bool FlowFileRecord::DeSerialize(std::string key) {
   ret = flow_repository_->Get(key, value);
 
   if (!ret) {
-    logger_->log_error("NiFi FlowFile Store event %s can not found",
-                       key.c_str());
+    logger_->log_error("NiFi FlowFile Store event %s can not found", 
key.c_str());
     return false;
   } else {
-    logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(),
-                       value.length());
+    logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(), 
value.length());
   }
   io::DataStream stream((const uint8_t*) value.data(), value.length());
 
   ret = DeSerialize(stream);
 
   if (ret) {
-    logger_->log_debug(
-        "NiFi FlowFile retrieve uuid %s size %d connection %s success",
-        uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
+    logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %s 
success", uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
   } else {
-    logger_->log_debug(
-        "NiFi FlowFile retrieve uuid %s size %d connection %d fail",
-        uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
+    logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %d 
fail", uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str());
   }
 
   return ret;
@@ -260,15 +245,11 @@ bool FlowFileRecord::Serialize() {
     return false;
   }
 
-  if (flow_repository_->Put(uuid_str_,
-                            const_cast<uint8_t*>(outStream.getBuffer()),
-                            outStream.getSize())) {
-    logger_->log_debug("NiFi FlowFile Store event %s size %d success",
-                       uuid_str_.c_str(), outStream.getSize());
+  if (flow_repository_->Put(uuid_str_, 
const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) {
+    logger_->log_debug("NiFi FlowFile Store event %s size %d success", 
uuid_str_.c_str(), outStream.getSize());
     return true;
   } else {
-    logger_->log_error("NiFi FlowFile Store event %s size %d fail",
-                       uuid_str_.c_str(), outStream.getSize());
+    logger_->log_error("NiFi FlowFile Store event %s size %d fail", 
uuid_str_.c_str(), outStream.getSize());
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/Properties.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp
index f877e7a..abebfbb 100644
--- a/libminifi/src/Properties.cpp
+++ b/libminifi/src/Properties.cpp
@@ -28,7 +28,9 @@ namespace minifi {
 
 #define BUFFER_SIZE 512
 
-Properties::Properties() : 
logger_(logging::LoggerFactory<Properties>::getLogger()) {}
+Properties::Properties()
+    : logger_(logging::LoggerFactory<Properties>::getLogger()) {
+}
 
 // Get the config value
 bool Properties::get(std::string key, std::string &value) {
@@ -62,8 +64,7 @@ void Properties::parseConfigureFileLine(char *buf) {
     ++line;
 
   char first = line[0];
-  if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n')
-      || (first == '=')) {
+  if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') 
|| (first == '=')) {
     return;
   }
 
@@ -96,8 +97,7 @@ void Properties::loadConfigureFile(const char *fileName) {
   if (fileName) {
     // perform a naive determination if this is a relative path
     if (fileName[0] != '/') {
-      adjustedFilename = adjustedFilename + getHome() + "/"
-          + fileName;
+      adjustedFilename = adjustedFilename + getHome() + "/" + fileName;
     } else {
       adjustedFilename += fileName;
     }
@@ -115,8 +115,7 @@ void Properties::loadConfigureFile(const char *fileName) {
   this->clear();
 
   char buf[BUFFER_SIZE];
-  for (file.getline(buf, BUFFER_SIZE); file.good();
-      file.getline(buf, BUFFER_SIZE)) {
+  for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, 
BUFFER_SIZE)) {
     parseConfigureFileLine(buf);
   }
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/RemoteProcessorGroupPort.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp 
b/libminifi/src/RemoteProcessorGroupPort.cpp
index 2e7e61a..ca8d3be 100644
--- a/libminifi/src/RemoteProcessorGroupPort.cpp
+++ b/libminifi/src/RemoteProcessorGroupPort.cpp
@@ -43,39 +43,29 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-const std::string RemoteProcessorGroupPort::ProcessorName(
-    "RemoteProcessorGroupPort");
-core::Property RemoteProcessorGroupPort::hostName("Host Name",
-                                                  "Remote Host Name.",
-                                                  "localhost");
+const char 
*RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort");
+core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host 
Name.", "localhost");
 core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999");
-core::Property RemoteProcessorGroupPort::portUUID(
-    "Port UUID", "Specifies remote NiFi Port UUID.", "");
+core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies 
remote NiFi Port UUID.", "");
 core::Relationship RemoteProcessorGroupPort::relation;
 
 std::unique_ptr<Site2SiteClientProtocol> 
RemoteProcessorGroupPort::getNextProtocol(
-    bool create = true) {
+bool create = true) {
   std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr;
   if (!available_protocols_.try_dequeue(nextProtocol)) {
     if (create) {
       // create
-      nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(
-          new Site2SiteClientProtocol(nullptr));
+      nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(new 
Site2SiteClientProtocol(nullptr));
       nextProtocol->setPortId(protocol_uuid_);
-      std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str =
-          std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(
-              stream_factory_->createSocket(host_, port_));
-      std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(
-          new Site2SitePeer(std::move(str), host_, port_));
+      std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = 
std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(host_,
 port_));
+      std::unique_ptr<Site2SitePeer> peer_ = 
std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), host_, port_));
       nextProtocol->setPeer(std::move(peer_));
     }
   }
   return std::move(nextProtocol);
 }
 
-void RemoteProcessorGroupPort::returnProtocol(
-    std::unique_ptr<Site2SiteClientProtocol> return_protocol) {
-
+void 
RemoteProcessorGroupPort::returnProtocol(std::unique_ptr<Site2SiteClientProtocol>
 return_protocol) {
   if (available_protocols_.size_approx() >= max_concurrent_tasks_) {
     // let the memory be freed
     return;
@@ -96,9 +86,7 @@ void RemoteProcessorGroupPort::initialize() {
   setSupportedRelationships(relationships);
 }
 
-void RemoteProcessorGroupPort::onSchedule(
-    core::ProcessContext *context,
-    core::ProcessSessionFactory *sessionFactory) {
+void RemoteProcessorGroupPort::onSchedule(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory) {
   std::string value;
 
   int64_t lvalue;
@@ -106,8 +94,7 @@ void RemoteProcessorGroupPort::onSchedule(
   if (context->getProperty(hostName.getName(), value)) {
     host_ = value;
   }
-  if (context->getProperty(port.getName(), value)
-      && core::Property::StringToInt(value, lvalue)) {
+  if (context->getProperty(port.getName(), value) && 
core::Property::StringToInt(value, lvalue)) {
     port_ = (uint16_t) lvalue;
   }
   if (context->getProperty(portUUID.getName(), value)) {
@@ -115,8 +102,7 @@ void RemoteProcessorGroupPort::onSchedule(
   }
 }
 
-void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
-                                         core::ProcessSession *session) {
+void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
   if (!transmitting_)
     return;
 
@@ -130,8 +116,7 @@ void 
RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
   if (context->getProperty(hostName.getName(), value)) {
     host = value;
   }
-  if (context->getProperty(port.getName(), value)
-      && core::Property::StringToInt(value, lvalue)) {
+  if (context->getProperty(port.getName(), value) && 
core::Property::StringToInt(value, lvalue)) {
     sport = (uint16_t) lvalue;
   }
   if (context->getProperty(portUUID.getName(), value)) {
@@ -148,10 +133,8 @@ void 
RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context,
   if (!protocol_->bootstrap()) {
     // bootstrap the client protocol if needeed
     context->yield();
-    std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(
-        context->getProcessorNode().getProcessor());
-    logger_->log_error("Site2Site bootstrap failed yield period %d peer ",
-                       processor->getYieldPeriodMsec());
+    std::shared_ptr<Processor> processor = 
std::static_pointer_cast<Processor>(context->getProcessorNode().getProcessor());
+    logger_->log_error("Site2Site bootstrap failed yield period %d peer ", 
processor->getYieldPeriodMsec());
     returnProtocol(std::move(protocol_));
     return;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/SchedulingAgent.cpp 
b/libminifi/src/SchedulingAgent.cpp
index c582c52..24ba146 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -33,18 +33,14 @@ namespace minifi {
 
 bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) {
   // Whether it has work to do
-  if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections()
-      || processor->flowFilesQueued())
+  if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() 
|| processor->flowFilesQueued())
     return true;
   else
     return false;
 }
 
-void SchedulingAgent::enableControllerService(
-    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-
-  logger_->log_trace("Enabling CSN in SchedulingAgent %s",
-                     serviceNode->getName());
+void 
SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) {
+  logger_->log_trace("Enabling CSN in SchedulingAgent %s", 
serviceNode->getName());
   // reference the enable function from serviceNode
   std::function<bool()> f_ex = [serviceNode] {
     return serviceNode->enable();
@@ -57,9 +53,7 @@ void SchedulingAgent::enableControllerService(
   component_lifecycle_thread_pool_.execute(std::move(functor), future);
 }
 
-void SchedulingAgent::disableControllerService(
-    std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
-
+void 
SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode>
 &serviceNode) {
   // reference the disable function from serviceNode
   std::function<bool()> f_ex = [serviceNode] {
     return serviceNode->disable();
@@ -72,14 +66,11 @@ void SchedulingAgent::disableControllerService(
   component_lifecycle_thread_pool_.execute(std::move(functor), future);
 }
 
-bool SchedulingAgent::hasTooMuchOutGoing(
-    std::shared_ptr<core::Processor> processor) {
+bool SchedulingAgent::hasTooMuchOutGoing(std::shared_ptr<core::Processor> 
processor) {
   return processor->flowFilesOutGoingFull();
 }
 
-bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor,
-                                core::ProcessContext *processContext,
-                                core::ProcessSessionFactory *sessionFactory) {
+bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, 
core::ProcessContext *processContext, core::ProcessSessionFactory 
*sessionFactory) {
   if (processor->isYield())
     return false;
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/Site2SiteClientProtocol.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SiteClientProtocol.cpp 
b/libminifi/src/Site2SiteClientProtocol.cpp
index 475d49d..08ee665 100644
--- a/libminifi/src/Site2SiteClientProtocol.cpp
+++ b/libminifi/src/Site2SiteClientProtocol.cpp
@@ -38,8 +38,7 @@ namespace minifi {
 
 bool Site2SiteClientProtocol::establish() {
   if (_peerState != IDLE) {
-    logger_->log_error(
-        "Site2Site peer state is not idle while try to establish");
+    logger_->log_error("Site2Site peer state is not idle while try to 
establish");
     return false;
   }
 
@@ -70,14 +69,11 @@ bool Site2SiteClientProtocol::establish() {
 bool Site2SiteClientProtocol::initiateResourceNegotiation() {
   // Negotiate the version
   if (_peerState != IDLE) {
-    logger_->log_error(
-        "Site2Site peer state is not idle while initiateResourceNegotiation");
+    logger_->log_error("Site2Site peer state is not idle while 
initiateResourceNegotiation");
     return false;
   }
 
-  logger_->log_info(
-      "Negotiate protocol version with destination port %s current version %d",
-      _portIdStr.c_str(), _currentVersion);
+  logger_->log_info("Negotiate protocol version with destination port %s 
current version %d", _portIdStr.c_str(), _currentVersion);
 
   int ret = peer_->writeUTF(this->getResourceName());
 
@@ -106,39 +102,35 @@ bool 
Site2SiteClientProtocol::initiateResourceNegotiation() {
   }
   logger_->log_info("status code is %i", statusCode);
   switch (statusCode) {
-  case RESOURCE_OK:
-    logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
-    return true;
-  case DIFFERENT_RESOURCE_VERSION:
-    uint32_t serverVersion;
-    ret = peer_->read(serverVersion);
-    if (ret <= 0) {
+    case RESOURCE_OK:
+      logger_->log_info("Site2Site Protocol Negotiate protocol version OK");
+      return true;
+    case DIFFERENT_RESOURCE_VERSION:
+      uint32_t serverVersion;
+      ret = peer_->read(serverVersion);
+      if (ret <= 0) {
+        // tearDown();
+        return false;
+      }
+      logger_->log_info("Site2Site Server Response asked for a different 
protocol version %d", serverVersion);
+      for (unsigned int i = (_currentVersionIndex + 1); i < 
sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
+        if (serverVersion >= _supportedVersion[i]) {
+          _currentVersion = _supportedVersion[i];
+          _currentVersionIndex = i;
+          return initiateResourceNegotiation();
+        }
+      }
+      ret = -1;
       // tearDown();
       return false;
-    }
-    logger_->log_info(
-        "Site2Site Server Response asked for a different protocol version %d",
-        serverVersion);
-    for (unsigned int i = (_currentVersionIndex + 1);
-        i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
-      if (serverVersion >= _supportedVersion[i]) {
-        _currentVersion = _supportedVersion[i];
-        _currentVersionIndex = i;
-        return initiateResourceNegotiation();
-      }
-    }
-    ret = -1;
-    // tearDown();
-    return false;
-  case NEGOTIATED_ABORT:
-    logger_->log_info("Site2Site Negotiate protocol response ABORT");
-    ret = -1;
-    // tearDown();
-    return false;
-  default:
-    logger_->log_info("Negotiate protocol response unknown code %d",
-        statusCode);
-    return true;
+    case NEGOTIATED_ABORT:
+      logger_->log_info("Site2Site Negotiate protocol response ABORT");
+      ret = -1;
+      // tearDown();
+      return false;
+    default:
+      logger_->log_info("Negotiate protocol response unknown code %d", 
statusCode);
+      return true;
   }
 
   return true;
@@ -147,14 +139,11 @@ bool 
Site2SiteClientProtocol::initiateResourceNegotiation() {
 bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
   // Negotiate the version
   if (_peerState != HANDSHAKED) {
-    logger_->log_error(
-        "Site2Site peer state is not handshaked while 
initiateCodecResourceNegotiation");
+    logger_->log_error("Site2Site peer state is not handshaked while 
initiateCodecResourceNegotiation");
     return false;
   }
 
-  logger_->log_info(
-      "Negotiate Codec version with destination port %s current version %d",
-      _portIdStr.c_str(), _currentCodecVersion);
+  logger_->log_info("Negotiate Codec version with destination port %s current 
version %d", _portIdStr.c_str(), _currentCodecVersion);
 
   int ret = peer_->writeUTF(this->getCodecResourceName());
 
@@ -181,38 +170,35 @@ bool 
Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
   }
 
   switch (statusCode) {
-  case RESOURCE_OK:
-    logger_->log_info("Site2Site Codec Negotiate version OK");
-    return true;
-  case DIFFERENT_RESOURCE_VERSION:
-    uint32_t serverVersion;
-    ret = peer_->read(serverVersion);
-    if (ret <= 0) {
+    case RESOURCE_OK:
+      logger_->log_info("Site2Site Codec Negotiate version OK");
+      return true;
+    case DIFFERENT_RESOURCE_VERSION:
+      uint32_t serverVersion;
+      ret = peer_->read(serverVersion);
+      if (ret <= 0) {
+        // tearDown();
+        return false;
+      }
+      logger_->log_info("Site2Site Server Response asked for a different codec 
version %d", serverVersion);
+      for (unsigned int i = (_currentCodecVersionIndex + 1); i < 
sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
+        if (serverVersion >= _supportedCodecVersion[i]) {
+          _currentCodecVersion = _supportedCodecVersion[i];
+          _currentCodecVersionIndex = i;
+          return initiateCodecResourceNegotiation();
+        }
+      }
+      ret = -1;
       // tearDown();
       return false;
-    }
-    logger_->log_info(
-        "Site2Site Server Response asked for a different codec version %d",
-        serverVersion);
-    for (unsigned int i = (_currentCodecVersionIndex + 1);
-        i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
-      if (serverVersion >= _supportedCodecVersion[i]) {
-        _currentCodecVersion = _supportedCodecVersion[i];
-        _currentCodecVersionIndex = i;
-        return initiateCodecResourceNegotiation();
-      }
-    }
-    ret = -1;
-    // tearDown();
-    return false;
-  case NEGOTIATED_ABORT:
-    logger_->log_info("Site2Site Codec Negotiate response ABORT");
-    ret = -1;
-    // tearDown();
-    return false;
-  default:
-    logger_->log_info("Negotiate Codec response unknown code %d", statusCode);
-    return true;
+    case NEGOTIATED_ABORT:
+      logger_->log_info("Site2Site Codec Negotiate response ABORT");
+      ret = -1;
+      // tearDown();
+      return false;
+    default:
+      logger_->log_info("Negotiate Codec response unknown code %d", 
statusCode);
+      return true;
   }
 
   return true;
@@ -220,13 +206,10 @@ bool 
Site2SiteClientProtocol::initiateCodecResourceNegotiation() {
 
 bool Site2SiteClientProtocol::handShake() {
   if (_peerState != ESTABLISHED) {
-    logger_->log_error(
-        "Site2Site peer state is not established while handshake");
+    logger_->log_error("Site2Site peer state is not established while 
handshake");
     return false;
   }
-  logger_->log_info(
-      "Site2Site Protocol Perform hand shake with destination port %s",
-      _portIdStr.c_str());
+  logger_->log_info("Site2Site Protocol Perform hand shake with destination 
port %s", _portIdStr.c_str());
   uuid_t uuid;
   // Generate the global UUID for the com identify
   uuid_generate(uuid);
@@ -244,18 +227,14 @@ bool Site2SiteClientProtocol::handShake() {
   std::map<std::string, std::string> properties;
   properties[HandShakePropertyStr[GZIP]] = "false";
   properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr;
-  properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(
-      this->_timeOut);
+  properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = 
std::to_string(this->_timeOut);
   if (this->_currentVersion >= 5) {
     if (this->_batchCount > 0)
-      properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(
-          this->_batchCount);
+      properties[HandShakePropertyStr[BATCH_COUNT]] = 
std::to_string(this->_batchCount);
     if (this->_batchSize > 0)
-      properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(
-          this->_batchSize);
+      properties[HandShakePropertyStr[BATCH_SIZE]] = 
std::to_string(this->_batchSize);
     if (this->_batchDuration > 0)
-      properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(
-          this->_batchDuration);
+      properties[HandShakePropertyStr[BATCH_DURATION]] = 
std::to_string(this->_batchDuration);
   }
 
   if (_currentVersion >= 3) {
@@ -285,8 +264,7 @@ bool Site2SiteClientProtocol::handShake() {
       // tearDown();
       return false;
     }
-    logger_->log_info("Site2Site Protocol Send handshake properties %s %s",
-                      it->first.c_str(), it->second.c_str());
+    logger_->log_info("Site2Site Protocol Send handshake properties %s %s", 
it->first.c_str(), it->second.c_str());
   }
 
   RespondCode code;
@@ -307,16 +285,14 @@ bool Site2SiteClientProtocol::handShake() {
     case PORT_NOT_IN_VALID_STATE:
     case UNKNOWN_PORT:
     case PORTS_DESTINATION_FULL:
-      logger_->log_error(
-          "Site2Site HandShake Failed because destination port is either 
invalid or full");
+      logger_->log_error("Site2Site HandShake Failed because destination port 
is either invalid or full");
       ret = -1;
       /*
        peer_->yield();
        tearDown(); */
       return false;
     default:
-      logger_->log_info("HandShake Failed because of unknown respond code %d",
-                        code);
+      logger_->log_info("HandShake Failed because of unknown respond code %d", 
code);
       ret = -1;
       /*
        peer_->yield();
@@ -368,8 +344,7 @@ int Site2SiteClientProtocol::readRequestType(RequestType 
&type) {
   return -1;
 }
 
-int Site2SiteClientProtocol::readRespond(RespondCode &code,
-                                         std::string &message) {
+int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string 
&message) {
   uint8_t firstByte;
 
   int ret = peer_->read(firstByte);
@@ -407,8 +382,7 @@ int Site2SiteClientProtocol::readRespond(RespondCode &code,
   return 3 + message.size();
 }
 
-int Site2SiteClientProtocol::writeRespond(RespondCode code,
-                                          std::string message) {
+int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string 
message) {
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
 
   if (resCode == NULL) {
@@ -440,14 +414,11 @@ int Site2SiteClientProtocol::writeRespond(RespondCode 
code,
 
 bool Site2SiteClientProtocol::negotiateCodec() {
   if (_peerState != HANDSHAKED) {
-    logger_->log_error(
-        "Site2Site peer state is not handshaked while negotiate codec");
+    logger_->log_error("Site2Site peer state is not handshaked while negotiate 
codec");
     return false;
   }
 
-  logger_->log_info(
-      "Site2Site Protocol Negotiate Codec with destination port %s",
-      _portIdStr.c_str());
+  logger_->log_info("Site2Site Protocol Negotiate Codec with destination port 
%s", _portIdStr.c_str());
 
   int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
 
@@ -467,8 +438,7 @@ bool Site2SiteClientProtocol::negotiateCodec() {
     return false;
   }
 
-  logger_->log_info(
-      "Site2Site Codec Completed and move to READY state for data transfer");
+  logger_->log_info("Site2Site Codec Completed and move to READY state for 
data transfer");
   _peerState = READY;
 
   return true;
@@ -490,8 +460,7 @@ bool Site2SiteClientProtocol::bootstrap() {
   }
 }
 
-Transaction* Site2SiteClientProtocol::createTransaction(
-    std::string &transactionID, TransferDirection direction) {
+Transaction* Site2SiteClientProtocol::createTransaction(std::string 
&transactionID, TransferDirection direction) {
   int ret;
   bool dataAvailable;
   Transaction *transaction = NULL;
@@ -522,8 +491,7 @@ Transaction* Site2SiteClientProtocol::createTransaction(
       return NULL;
     }
 
-    org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(
-        peer_.get());
+    org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> 
crcstream(peer_.get());
     switch (code) {
       case MORE_DATA:
         dataAvailable = true;
@@ -532,8 +500,7 @@ Transaction* Site2SiteClientProtocol::createTransaction(
         _transactionMap[transaction->getUUIDStr()] = transaction;
         transactionID = transaction->getUUIDStr();
         transaction->setDataAvailable(dataAvailable);
-        logger_->log_info("Site2Site create transaction %s",
-                          transaction->getUUIDStr().c_str());
+        logger_->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
         return transaction;
       case NO_MORE_DATA:
         dataAvailable = false;
@@ -542,12 +509,10 @@ Transaction* Site2SiteClientProtocol::createTransaction(
         _transactionMap[transaction->getUUIDStr()] = transaction;
         transactionID = transaction->getUUIDStr();
         transaction->setDataAvailable(dataAvailable);
-        logger_->log_info("Site2Site create transaction %s",
-                          transaction->getUUIDStr().c_str());
+        logger_->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
         return transaction;
       default:
-        logger_->log_info(
-            "Site2Site got unexpected response %d when asking for data", code);
+        logger_->log_info("Site2Site got unexpected response %d when asking 
for data", code);
         // tearDown();
         return NULL;
     }
@@ -558,20 +523,17 @@ Transaction* Site2SiteClientProtocol::createTransaction(
       // tearDown();
       return NULL;
     } else {
-      org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(
-          peer_.get());
+      org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> 
crcstream(peer_.get());
       transaction = new Transaction(direction, crcstream);
       _transactionMap[transaction->getUUIDStr()] = transaction;
       transactionID = transaction->getUUIDStr();
-      logger_->log_info("Site2Site create transaction %s",
-                        transaction->getUUIDStr().c_str());
+      logger_->log_info("Site2Site create transaction %s", 
transaction->getUUIDStr().c_str());
       return transaction;
     }
   }
 }
 
-bool Site2SiteClientProtocol::receive(std::string transactionID,
-                                      DataPacket *packet, bool &eof) {
+bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket 
*packet, bool &eof) {
   int ret;
   Transaction *transaction = NULL;
 
@@ -583,8 +545,7 @@ bool Site2SiteClientProtocol::receive(std::string 
transactionID,
     return false;
   }
 
-  std::map<std::string, Transaction *>::iterator it =
-      this->_transactionMap.find(transactionID);
+  std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
 
   if (it == _transactionMap.end()) {
     return false;
@@ -592,17 +553,13 @@ bool Site2SiteClientProtocol::receive(std::string 
transactionID,
     transaction = it->second;
   }
 
-  if (transaction->getState() != TRANSACTION_STARTED
-      && transaction->getState() != DATA_EXCHANGED) {
-    logger_->log_info(
-        "Site2Site transaction %s is not at started or exchanged state",
-        transactionID.c_str());
+  if (transaction->getState() != TRANSACTION_STARTED && 
transaction->getState() != DATA_EXCHANGED) {
+    logger_->log_info("Site2Site transaction %s is not at started or exchanged 
state", transactionID.c_str());
     return false;
   }
 
   if (transaction->getDirection() != RECEIVE) {
-    logger_->log_info("Site2Site transaction %s direction is wrong",
-                      transactionID.c_str());
+    logger_->log_info("Site2Site transaction %s direction is wrong", 
transactionID.c_str());
     return false;
   }
 
@@ -622,19 +579,13 @@ bool Site2SiteClientProtocol::receive(std::string 
transactionID,
       return false;
     }
     if (code == CONTINUE_TRANSACTION) {
-      logger_->log_info(
-          "Site2Site transaction %s peer indicate continue transaction",
-          transactionID.c_str());
+      logger_->log_info("Site2Site transaction %s peer indicate continue 
transaction", transactionID.c_str());
       transaction->_dataAvailable = true;
     } else if (code == FINISH_TRANSACTION) {
-      logger_->log_info(
-          "Site2Site transaction %s peer indicate finish transaction",
-          transactionID.c_str());
+      logger_->log_info("Site2Site transaction %s peer indicate finish 
transaction", transactionID.c_str());
       transaction->_dataAvailable = false;
     } else {
-      logger_->log_info(
-          "Site2Site transaction %s peer indicate wrong respond code %d",
-          transactionID.c_str(), code);
+      logger_->log_info("Site2Site transaction %s peer indicate wrong respond 
code %d", transactionID.c_str(), code);
       return false;
     }
   }
@@ -664,9 +615,7 @@ bool Site2SiteClientProtocol::receive(std::string 
transactionID,
       return false;
     }
     packet->_attributes[key] = value;
-    logger_->log_info(
-        "Site2Site transaction %s receives attribute key %s value %s",
-        transactionID.c_str(), key.c_str(), value.c_str());
+    logger_->log_info("Site2Site transaction %s receives attribute key %s 
value %s", transactionID.c_str(), key.c_str(), value.c_str());
   }
 
   uint64_t len;
@@ -679,17 +628,12 @@ bool Site2SiteClientProtocol::receive(std::string 
transactionID,
   transaction->_transfers++;
   transaction->_state = DATA_EXCHANGED;
   transaction->_bytes += len;
-  logger_->log_info(
-      "Site2Site transaction %s receives flow record %d, total length %d",
-      transactionID.c_str(), transaction->_transfers, transaction->_bytes);
+  logger_->log_info("Site2Site transaction %s receives flow record %d, total 
length %d", transactionID.c_str(), transaction->_transfers, 
transaction->_bytes);
 
   return true;
 }
 
-bool Site2SiteClientProtocol::send(std::string transactionID,
-                                   DataPacket *packet,
-                                   std::shared_ptr<FlowFileRecord> flowFile,
-                                   core::ProcessSession *session) {
+bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket 
*packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession 
*session) {
   int ret;
   Transaction *transaction = NULL;
 
@@ -701,8 +645,7 @@ bool Site2SiteClientProtocol::send(std::string 
transactionID,
     return false;
   }
 
-  std::map<std::string, Transaction *>::iterator it =
-      this->_transactionMap.find(transactionID);
+  std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
 
   if (it == _transactionMap.end()) {
     return false;
@@ -710,17 +653,13 @@ bool Site2SiteClientProtocol::send(std::string 
transactionID,
     transaction = it->second;
   }
 
-  if (transaction->getState() != TRANSACTION_STARTED
-      && transaction->getState() != DATA_EXCHANGED) {
-    logger_->log_info(
-        "Site2Site transaction %s is not at started or exchanged state",
-        transactionID.c_str());
+  if (transaction->getState() != TRANSACTION_STARTED && 
transaction->getState() != DATA_EXCHANGED) {
+    logger_->log_info("Site2Site transaction %s is not at started or exchanged 
state", transactionID.c_str());
     return false;
   }
 
   if (transaction->getDirection() != SEND) {
-    logger_->log_info("Site2Site transaction %s direction is wrong",
-                      transactionID.c_str());
+    logger_->log_info("Site2Site transaction %s direction is wrong", 
transactionID.c_str());
     return false;
   }
 
@@ -739,8 +678,7 @@ bool Site2SiteClientProtocol::send(std::string 
transactionID,
   }
 
   std::map<std::string, std::string>::iterator itAttribute;
-  for (itAttribute = packet->_attributes.begin();
-      itAttribute != packet->_attributes.end(); itAttribute++) {
+  for (itAttribute = packet->_attributes.begin(); itAttribute != 
packet->_attributes.end(); itAttribute++) {
     ret = transaction->getStream().writeUTF(itAttribute->first, true);
 
     if (ret <= 0) {
@@ -750,9 +688,7 @@ bool Site2SiteClientProtocol::send(std::string 
transactionID,
     if (ret <= 0) {
       return false;
     }
-    logger_->log_info("Site2Site transaction %s send attribute key %s value 
%s",
-                      transactionID.c_str(), itAttribute->first.c_str(),
-                      itAttribute->second.c_str());
+    logger_->log_info("Site2Site transaction %s send attribute key %s value 
%s", transactionID.c_str(), itAttribute->first.c_str(), 
itAttribute->second.c_str());
   }
 
   uint64_t len = 0;
@@ -777,8 +713,7 @@ bool Site2SiteClientProtocol::send(std::string 
transactionID,
       return false;
     }
 
-    ret = transaction->getStream().writeData(
-        reinterpret_cast<uint8_t *> (const_cast<char*> 
(packet->payload_.c_str())), len);
+    ret = transaction->getStream().writeData(reinterpret_cast<uint8_t 
*>(const_cast<char*>(packet->payload_.c_str())), len);
     if (ret != len) {
       return false;
     }
@@ -788,15 +723,12 @@ bool Site2SiteClientProtocol::send(std::string 
transactionID,
   transaction->_transfers++;
   transaction->_state = DATA_EXCHANGED;
   transaction->_bytes += len;
-  logger_->log_info(
-      "Site2Site transaction %s send flow record %d, total length %d",
-      transactionID.c_str(), transaction->_transfers, transaction->_bytes);
+  logger_->log_info("Site2Site transaction %s send flow record %d, total 
length %d", transactionID.c_str(), transaction->_transfers, 
transaction->_bytes);
 
   return true;
 }
 
-void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
-                                               core::ProcessSession *session) {
+void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, 
core::ProcessSession *session) {
   uint64_t bytes = 0;
   int transfers = 0;
   Transaction *transaction = NULL;
@@ -808,8 +740,7 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
   if (_peerState != READY) {
     context->yield();
     tearDown();
-    throw Exception(SITE2SITE_EXCEPTION,
-                    "Can not establish handshake with peer");
+    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with 
peer");
     return;
   }
 
@@ -840,8 +771,7 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
         // transaction done
         break;
       }
-      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<
-          FlowFileRecord>(session->create());
+      std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
 
       if (!flowFile) {
         throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed");
@@ -849,8 +779,7 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
       }
       std::map<std::string, std::string>::iterator it;
       std::string sourceIdentifier;
-      for (it = packet._attributes.begin(); it != packet._attributes.end();
-          it++) {
+      for (it = packet._attributes.begin(); it != packet._attributes.end(); 
it++) {
         if (it->first == FlowAttributeKey(UUID))
           sourceIdentifier = it->second;
         flowFile->addAttribute(it->first, it->second);
@@ -867,11 +796,8 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
       core::Relationship relation;  // undefined relationship
       uint64_t endTime = getTimeMillis();
       std::string transitUri = peer_->getURL() + "/" + sourceIdentifier;
-      std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host="
-          + peer_->getHostName();
-      session->getProvenanceReporter()->receive(flowFile, transitUri,
-                                                sourceIdentifier, details,
-                                                endTime - startTime);
+      std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + 
peer_->getHostName();
+      session->getProvenanceReporter()->receive(flowFile, transitUri, 
sourceIdentifier, details, endTime - startTime);
       session->transfer(flowFile, relation);
       // receive the transfer for the flow record
       bytes += packet._size;
@@ -886,9 +812,7 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
       throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed");
       return;
     }
-    logger_->log_info(
-        "Site2Site transaction %s successfully receive flow record %d, content 
bytes %d",
-        transactionID.c_str(), transfers, bytes);
+    logger_->log_info("Site2Site transaction %s successfully receive flow 
record %d, content bytes %d", transactionID.c_str(), transfers, bytes);
     // we yield the receive if we did not get anything
     if (transfers == 0)
       context->yield();
@@ -904,8 +828,7 @@ void 
Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context,
       deleteTransaction(transactionID);
     context->yield();
     tearDown();
-    logger_->log_debug(
-        "Caught Exception during Site2SiteClientProtocol::receiveFlowFiles");
+    logger_->log_debug("Caught Exception during 
Site2SiteClientProtocol::receiveFlowFiles");
     throw;
   }
 
@@ -926,8 +849,7 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
     return false;
   }
 
-  std::map<std::string, Transaction *>::iterator it =
-      this->_transactionMap.find(transactionID);
+  std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
 
   if (it == _transactionMap.end()) {
     return false;
@@ -935,9 +857,7 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
     transaction = it->second;
   }
 
-  if (transaction->getState() == TRANSACTION_STARTED
-      && !transaction->isDataAvailable()
-      && transaction->getDirection() == RECEIVE) {
+  if (transaction->getState() == TRANSACTION_STARTED && 
!transaction->isDataAvailable() && transaction->getDirection() == RECEIVE) {
     transaction->_state = TRANSACTION_CONFIRMED;
     return true;
   }
@@ -957,8 +877,7 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
     // time window involved in the entire transaction, it is reduced to a 
simple round-trip conversation.
     int64_t crcValue = transaction->getCRC();
     std::string crc = std::to_string(crcValue);
-    logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s",
-                      transaction->getCRC(), transactionID.c_str());
+    logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", 
transaction->getCRC(), transactionID.c_str());
     ret = writeRespond(CONFIRM_TRANSACTION, crc);
     if (ret <= 0)
       return false;
@@ -969,25 +888,21 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
       return false;
 
     if (code == CONFIRM_TRANSACTION) {
-      logger_->log_info("Site2Site transaction %s peer confirm transaction",
-                        transactionID.c_str());
+      logger_->log_info("Site2Site transaction %s peer confirm transaction", 
transactionID.c_str());
       transaction->_state = TRANSACTION_CONFIRMED;
       return true;
     } else if (code == BAD_CHECKSUM) {
-      logger_->log_info("Site2Site transaction %s peer indicate bad checksum",
-                        transactionID.c_str());
+      logger_->log_info("Site2Site transaction %s peer indicate bad checksum", 
transactionID.c_str());
       /*
        transaction->_state = TRANSACTION_CONFIRMED;
        return true; */
       return false;
     } else {
-      logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d",
-                        transactionID.c_str(), code);
+      logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d", transactionID.c_str(), code);
       return false;
     }
   } else {
-    logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s",
-                      transactionID.c_str());
+    logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", 
transactionID.c_str());
     ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION");
     if (ret <= 0)
       return false;
@@ -999,23 +914,19 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
 
     // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 
'Confirm Transaction' response
     if (code == CONFIRM_TRANSACTION) {
-      logger_->log_info(
-          "Site2Site transaction %s peer confirm transaction with CRC %s",
-          transactionID.c_str(), message.c_str());
+      logger_->log_info("Site2Site transaction %s peer confirm transaction 
with CRC %s", transactionID.c_str(), message.c_str());
       if (this->_currentVersion > 3) {
         int64_t crcValue = transaction->getCRC();
         std::string crc = std::to_string(crcValue);
         if (message == crc) {
-          logger_->log_info("Site2Site transaction %s CRC matched",
-                            transactionID.c_str());
+          logger_->log_info("Site2Site transaction %s CRC matched", 
transactionID.c_str());
           ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
           if (ret <= 0)
             return false;
           transaction->_state = TRANSACTION_CONFIRMED;
           return true;
         } else {
-          logger_->log_info("Site2Site transaction %s CRC not matched %s",
-                            transactionID.c_str(), crc.c_str());
+          logger_->log_info("Site2Site transaction %s CRC not matched %s", 
transactionID.c_str(), crc.c_str());
           ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM");
           /*
            ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION");
@@ -1032,8 +943,7 @@ bool Site2SiteClientProtocol::confirm(std::string 
transactionID) {
       transaction->_state = TRANSACTION_CONFIRMED;
       return true;
     } else {
-      logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d",
-                        transactionID.c_str(), code);
+      logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d", transactionID.c_str(), code);
       return false;
     }
     return false;
@@ -1047,8 +957,7 @@ void Site2SiteClientProtocol::cancel(std::string 
transactionID) {
     return;
   }
 
-  std::map<std::string, Transaction *>::iterator it =
-      this->_transactionMap.find(transactionID);
+  std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
 
   if (it == _transactionMap.end()) {
     return;
@@ -1056,9 +965,7 @@ void Site2SiteClientProtocol::cancel(std::string 
transactionID) {
     transaction = it->second;
   }
 
-  if (transaction->getState() == TRANSACTION_CANCELED
-      || transaction->getState() == TRANSACTION_COMPLETED
-      || transaction->getState() == TRANSACTION_ERROR) {
+  if (transaction->getState() == TRANSACTION_CANCELED || 
transaction->getState() == TRANSACTION_COMPLETED || transaction->getState() == 
TRANSACTION_ERROR) {
     return;
   }
 
@@ -1072,8 +979,7 @@ void Site2SiteClientProtocol::cancel(std::string 
transactionID) {
 void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) {
   Transaction *transaction = NULL;
 
-  std::map<std::string, Transaction *>::iterator it =
-      this->_transactionMap.find(transactionID);
+  std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
 
   if (it == _transactionMap.end()) {
     return;
@@ -1081,8 +987,7 @@ void 
Site2SiteClientProtocol::deleteTransaction(std::string transactionID) {
     transaction = it->second;
   }
 
-  logger_->log_info("Site2Site delete transaction %s",
-                    transaction->getUUIDStr().c_str());
+  logger_->log_info("Site2Site delete transaction %s", 
transaction->getUUIDStr().c_str());
   delete transaction;
   _transactionMap.erase(transactionID);
 }
@@ -1090,8 +995,7 @@ void 
Site2SiteClientProtocol::deleteTransaction(std::string transactionID) {
 void Site2SiteClientProtocol::error(std::string transactionID) {
   Transaction *transaction = NULL;
 
-  std::map<std::string, Transaction *>::iterator it =
-      this->_transactionMap.find(transactionID);
+  std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
 
   if (it == _transactionMap.end()) {
     return;
@@ -1117,8 +1021,7 @@ bool Site2SiteClientProtocol::complete(std::string 
transactionID) {
     return false;
   }
 
-  std::map<std::string, Transaction *>::iterator it =
-      this->_transactionMap.find(transactionID);
+  std::map<std::string, Transaction *>::iterator it = 
this->_transactionMap.find(transactionID);
 
   if (it == _transactionMap.end()) {
     return false;
@@ -1135,8 +1038,7 @@ bool Site2SiteClientProtocol::complete(std::string 
transactionID) {
       transaction->_state = TRANSACTION_COMPLETED;
       return true;
     } else {
-      logger_->log_info("Site2Site transaction %s send finished",
-                        transactionID.c_str());
+      logger_->log_info("Site2Site transaction %s send finished", 
transactionID.c_str());
       ret = this->writeRespond(TRANSACTION_FINISHED, "Finished");
       if (ret <= 0) {
         return false;
@@ -1156,22 +1058,18 @@ bool Site2SiteClientProtocol::complete(std::string 
transactionID) {
       return false;
 
     if (code == TRANSACTION_FINISHED) {
-      logger_->log_info("Site2Site transaction %s peer finished transaction",
-                        transactionID.c_str());
+      logger_->log_info("Site2Site transaction %s peer finished transaction", 
transactionID.c_str());
       transaction->_state = TRANSACTION_COMPLETED;
       return true;
     } else {
-      logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d",
-                        transactionID.c_str(), code);
+      logger_->log_info("Site2Site transaction %s peer unknown respond code 
%d", transactionID.c_str(), code);
       return false;
     }
   }
 }
 
-void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
-                                                core::ProcessSession *session) 
{
-  std::shared_ptr<FlowFileRecord> flow =
-      std::static_pointer_cast<FlowFileRecord>(session->get());
+void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, 
core::ProcessSession *session) {
+  std::shared_ptr<FlowFileRecord> flow = 
std::static_pointer_cast<FlowFileRecord>(session->get());
 
   Transaction *transaction = NULL;
 
@@ -1185,8 +1083,7 @@ void 
Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
   if (_peerState != READY) {
     context->yield();
     tearDown();
-    throw Exception(SITE2SITE_EXCEPTION,
-                    "Can not establish handshake with peer");
+    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with 
peer");
     return;
   }
 
@@ -1214,14 +1111,11 @@ void 
Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
         throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
         return;
       }
-      logger_->log_info("Site2Site transaction %s send flow record %s",
-                        transactionID.c_str(), flow->getUUIDStr().c_str());
+      logger_->log_info("Site2Site transaction %s send flow record %s", 
transactionID.c_str(), flow->getUUIDStr().c_str());
       uint64_t endTime = getTimeMillis();
       std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
-      std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host="
-          + peer_->getHostName();
-      session->getProvenanceReporter()->send(flow, transitUri, details,
-                                             endTime - startTime, false);
+      std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" 
+ peer_->getHostName();
+      session->getProvenanceReporter()->send(flow, transitUri, details, 
endTime - startTime, false);
       session->remove(flow);
 
       uint64_t transferNanos = getTimeNano() - startSendingNanos;
@@ -1243,9 +1137,7 @@ void 
Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
       throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
       return;
     }
-    logger_->log_info(
-        "Site2Site transaction %s successfully send flow record %d, content 
bytes %d",
-        transactionID.c_str(), transaction->_transfers, transaction->_bytes);
+    logger_->log_info("Site2Site transaction %s successfully send flow record 
%d, content bytes %d", transactionID.c_str(), transaction->_transfers, 
transaction->_bytes);
   } catch (std::exception &exception) {
     if (transaction)
       deleteTransaction(transactionID);
@@ -1258,8 +1150,7 @@ void 
Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
       deleteTransaction(transactionID);
     context->yield();
     tearDown();
-    logger_->log_debug(
-        "Caught Exception during Site2SiteClientProtocol::transferFlowFiles");
+    logger_->log_debug("Caught Exception during 
Site2SiteClientProtocol::transferFlowFiles");
     throw;
   }
 
@@ -1268,9 +1159,7 @@ void 
Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context,
   return;
 }
 
-void Site2SiteClientProtocol::transferString(core::ProcessContext *context,
-    core::ProcessSession *session, std::string &payload,
-    std::map<std::string, std::string> attributes) {
+void Site2SiteClientProtocol::transferString(core::ProcessContext *context, 
core::ProcessSession *session, std::string &payload, std::map<std::string, 
std::string> attributes) {
   Transaction *transaction = NULL;
 
   if (payload.length() <= 0)
@@ -1283,8 +1172,7 @@ void 
Site2SiteClientProtocol::transferString(core::ProcessContext *context,
   if (_peerState != READY) {
     context->yield();
     tearDown();
-    throw Exception(SITE2SITE_EXCEPTION,
-        "Can not establish handshake with peer");
+    throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with 
peer");
     return;
   }
 
@@ -1306,8 +1194,7 @@ void 
Site2SiteClientProtocol::transferString(core::ProcessContext *context,
       throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
       return;
     }
-    logger_->log_info("Site2Site transaction %s send bytes length %d",
-        transactionID.c_str(), payload.length());
+    logger_->log_info("Site2Site transaction %s send bytes length %d", 
transactionID.c_str(), payload.length());
 
     if (!confirm(transactionID)) {
       throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed");
@@ -1317,9 +1204,7 @@ void 
Site2SiteClientProtocol::transferString(core::ProcessContext *context,
       throw Exception(SITE2SITE_EXCEPTION, "Complete Failed");
       return;
     }
-    logger_->log_info(
-        "Site2Site transaction %s successfully send flow record %d, content 
bytes %d",
-        transactionID.c_str(), transaction->_transfers, transaction->_bytes);
+    logger_->log_info("Site2Site transaction %s successfully send flow record 
%d, content bytes %d", transactionID.c_str(), transaction->_transfers, 
transaction->_bytes);
   } catch (std::exception &exception) {
     if (transaction)
       deleteTransaction(transactionID);
@@ -1332,8 +1217,7 @@ void 
Site2SiteClientProtocol::transferString(core::ProcessContext *context,
       deleteTransaction(transactionID);
     context->yield();
     tearDown();
-    logger_->log_debug(
-        "Caught Exception during Site2SiteClientProtocol::transferBytes");
+    logger_->log_debug("Caught Exception during 
Site2SiteClientProtocol::transferBytes");
     throw;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/Site2SitePeer.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp
index 551d466..7c46564 100644
--- a/libminifi/src/Site2SitePeer.cpp
+++ b/libminifi/src/Site2SitePeer.cpp
@@ -44,9 +44,7 @@ bool Site2SitePeer::Open() {
 
   uint16_t data_size = sizeof MAGIC_BYTES;
 
-  if (stream_->writeData(
-      reinterpret_cast<uint8_t *>(const_cast<char*>(MAGIC_BYTES)), data_size)
-      != data_size) {
+  if (stream_->writeData(reinterpret_cast<uint8_t 
*>(const_cast<char*>(MAGIC_BYTES)), data_size) != data_size) {
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/ThreadedSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp 
b/libminifi/src/ThreadedSchedulingAgent.cpp
index 7e9bb03..46a4710 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -35,103 +35,77 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-void ThreadedSchedulingAgent::schedule(
-    std::shared_ptr<core::Processor> processor) {
+void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> 
processor) {
   std::lock_guard<std::mutex> lock(mutex_);
 
   admin_yield_duration_ = 0;
   std::string yieldValue;
 
-  if (configure_->get(Configure::nifi_administrative_yield_duration,
-                      yieldValue)) {
+  if (configure_->get(Configure::nifi_administrative_yield_duration, 
yieldValue)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(yieldValue, admin_yield_duration_, unit)
-        && core::Property::ConvertTimeUnitToMS(admin_yield_duration_, unit,
-                                               admin_yield_duration_)) {
-      logger_->log_debug("nifi_administrative_yield_duration: [%d] ms",
-                         admin_yield_duration_);
+    if (core::Property::StringToTime(yieldValue, admin_yield_duration_, unit) 
&& core::Property::ConvertTimeUnitToMS(admin_yield_duration_, unit, 
admin_yield_duration_)) {
+      logger_->log_debug("nifi_administrative_yield_duration: [%d] ms", 
admin_yield_duration_);
     }
   }
 
   bored_yield_duration_ = 0;
   if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) {
     core::TimeUnit unit;
-    if (core::Property::StringToTime(yieldValue, bored_yield_duration_, unit)
-        && core::Property::ConvertTimeUnitToMS(bored_yield_duration_, unit,
-                                               bored_yield_duration_)) {
-      logger_->log_debug("nifi_bored_yield_duration: [%d] ms",
-                         bored_yield_duration_);
+    if (core::Property::StringToTime(yieldValue, bored_yield_duration_, unit) 
&& core::Property::ConvertTimeUnitToMS(bored_yield_duration_, unit, 
bored_yield_duration_)) {
+      logger_->log_debug("nifi_bored_yield_duration: [%d] ms", 
bored_yield_duration_);
     }
   }
 
   if (processor->getScheduledState() != core::RUNNING) {
-    logger_->log_info(
-        "Can not schedule threads for processor %s because it is not running",
-        processor->getName().c_str());
+    logger_->log_info("Can not schedule threads for processor %s because it is 
not running", processor->getName().c_str());
     return;
   }
 
-  std::map<std::string, std::vector<std::thread *>>::iterator it =
-      _threads.find(processor->getUUIDStr());
+  std::map<std::string, std::vector<std::thread *>>::iterator it = 
_threads.find(processor->getUUIDStr());
   if (it != _threads.end()) {
-    logger_->log_info(
-        "Can not schedule threads for processor %s because there are existing 
threads running");
+    logger_->log_info("Can not schedule threads for processor %s because there 
are existing threads running");
     return;
   }
 
   core::ProcessorNode processor_node(processor);
-  auto processContext = std::make_shared<core::ProcessContext>(
-      processor_node, controller_service_provider_, repo_);
-  auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(
-      processContext.get());
+  auto processContext = std::make_shared<core::ProcessContext>(processor_node, 
controller_service_provider_, repo_);
+  auto sessionFactory = 
std::make_shared<core::ProcessSessionFactory>(processContext.get());
 
   processor->onSchedule(processContext.get(), sessionFactory.get());
 
   std::vector<std::thread *> threads;
   for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) {
     ThreadedSchedulingAgent *agent = this;
-    std::thread *thread = new std::thread(
-        [agent, processor, processContext, sessionFactory] () {
-          agent->run(processor, processContext.get(), sessionFactory.get());
-        });
+    std::thread *thread = new std::thread([agent, processor, processContext, 
sessionFactory] () {
+      agent->run(processor, processContext.get(), sessionFactory.get());
+    });
     thread->detach();
     threads.push_back(thread);
-    logger_->log_info("Scheduled thread %d running for process %s",
-                      thread->get_id(), processor->getName().c_str());
+    logger_->log_info("Scheduled thread %d running for process %s", 
thread->get_id(), processor->getName().c_str());
   }
   _threads[processor->getUUIDStr().c_str()] = threads;
 
   return;
 }
 
-void ThreadedSchedulingAgent::unschedule(
-    std::shared_ptr<core::Processor> processor) {
+void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> 
processor) {
   std::lock_guard<std::mutex> lock(mutex_);
-  logger_->log_info("Shutting down threads for processor %s/%s",
-                    processor->getName().c_str(),
-                    processor->getUUIDStr().c_str());
+  logger_->log_info("Shutting down threads for processor %s/%s", 
processor->getName().c_str(), processor->getUUIDStr().c_str());
 
   if (processor->getScheduledState() != core::RUNNING) {
-    logger_->log_info(
-        "Cannot unschedule threads for processor %s because it is not running",
-        processor->getName().c_str());
+    logger_->log_info("Cannot unschedule threads for processor %s because it 
is not running", processor->getName().c_str());
     return;
   }
 
-  std::map<std::string, std::vector<std::thread *>>::iterator it =
-      _threads.find(processor->getUUIDStr());
+  std::map<std::string, std::vector<std::thread *>>::iterator it = 
_threads.find(processor->getUUIDStr());
 
   if (it == _threads.end()) {
-    logger_->log_info(
-        "Cannot unschedule threads for processor %s because there are no 
existing threads running",
-        processor->getName().c_str());
+    logger_->log_info("Cannot unschedule threads for processor %s because 
there are no existing threads running", processor->getName().c_str());
     return;
   }
-  for (std::vector<std::thread *>::iterator itThread = it->second.begin();
-      itThread != it->second.end(); ++itThread) {
+  for (std::vector<std::thread *>::iterator itThread = it->second.begin(); 
itThread != it->second.end(); ++itThread) {
     std::thread *thread = *itThread;
-    logger_->log_info("Scheduled thread %d deleted for process %s",
-                      thread->get_id(), processor->getName().c_str());
+    logger_->log_info("Scheduled thread %d deleted for process %s", 
thread->get_id(), processor->getName().c_str());
     delete thread;
   }
   _threads.erase(processor->getUUIDStr());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp 
b/libminifi/src/TimerDrivenSchedulingAgent.cpp
index 8610e64..b9a41ea 100644
--- a/libminifi/src/TimerDrivenSchedulingAgent.cpp
+++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp
@@ -29,24 +29,17 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 
-void TimerDrivenSchedulingAgent::run(
-    std::shared_ptr<core::Processor> processor,
-    core::ProcessContext *processContext,
-    core::ProcessSessionFactory *sessionFactory) {
+void TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> 
processor, core::ProcessContext *processContext, core::ProcessSessionFactory 
*sessionFactory) {
   while (this->running_) {
-    bool shouldYield = this->onTrigger(processor, processContext,
-                                       sessionFactory);
+    bool shouldYield = this->onTrigger(processor, processContext, 
sessionFactory);
     if (processor->isYield()) {
       // Honor the yield
-      std::this_thread::sleep_for(
-          std::chrono::milliseconds(processor->getYieldTime()));
+      
std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime()));
     } else if (shouldYield && this->bored_yield_duration_ > 0) {
       // No work to do or need to apply back pressure
-      std::this_thread::sleep_for(
-          std::chrono::milliseconds(this->bored_yield_duration_));
+      
std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_));
     }
-    std::this_thread::sleep_for(
-        std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
+    
std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano()));
   }
   return;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/controllers/SSLContextService.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/controllers/SSLContextService.cpp 
b/libminifi/src/controllers/SSLContextService.cpp
index 51a7cc4..a9450f6 100644
--- a/libminifi/src/controllers/SSLContextService.cpp
+++ b/libminifi/src/controllers/SSLContextService.cpp
@@ -51,10 +51,8 @@ std::unique_ptr<SSLContext> 
SSLContextService::createSSLContext() {
   method = TLSv1_2_client_method();
   SSL_CTX *ctx = SSL_CTX_new(method);
 
-  if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM)
-      <= 0) {
-    logger_->log_error("Could not create load certificate, error : %s",
-                       std::strerror(errno));
+  if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) 
<= 0) {
+    logger_->log_error("Could not create load certificate, error : %s", 
std::strerror(errno));
     return nullptr;
   }
   if (!IsNullOrEmpty(passphrase_)) {
@@ -62,25 +60,20 @@ std::unique_ptr<SSLContext> 
SSLContextService::createSSLContext() {
     SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb);
   }
 
-  int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(),
-                                         SSL_FILETYPE_PEM);
+  int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), 
SSL_FILETYPE_PEM);
   if (retp != 1) {
-    logger_->log_error("Could not create load private key,%i on %s error : %s",
-                       retp, private_key_, std::strerror(errno));
+    logger_->log_error("Could not create load private key,%i on %s error : 
%s", retp, private_key_, std::strerror(errno));
     return nullptr;
   }
 
   if (!SSL_CTX_check_private_key(ctx)) {
-    logger_->log_error(
-        "Private key does not match the public certificate, error : %s",
-        std::strerror(errno));
+    logger_->log_error("Private key does not match the public certificate, 
error : %s", std::strerror(errno));
     return nullptr;
   }
 
   retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0);
   if (retp == 0) {
-    logger_->log_error("Can not load CA certificate, Exiting, error : %s",
-                       std::strerror(errno));
+    logger_->log_error("Can not load CA certificate, Exiting, error : %s", 
std::strerror(errno));
   }
   return std::unique_ptr<SSLContext>(new SSLContext(ctx));
 }
@@ -114,20 +107,16 @@ void SSLContextService::onEnable() {
   valid_ = true;
   core::Property property("Client Certificate", "Client Certificate");
   core::Property privKey("Private Key", "Private Key file");
-  core::Property passphrase_prop(
-      "Passphrase", "Client passphrase. Either a file or unencrypted text");
+  core::Property passphrase_prop("Passphrase", "Client passphrase. Either a 
file or unencrypted text");
   core::Property caCert("CA Certificate", "CA certificate file");
   std::string default_dir;
   if (nullptr != configuration_)
-  configuration_->get(Configure::nifi_default_directory, default_dir);
+    configuration_->get(Configure::nifi_default_directory, default_dir);
 
   logger_->log_trace("onEnable()");
 
-  if (getProperty(property.getName(), certificate)
-      && getProperty(privKey.getName(), private_key_)) {
-    logger_->log_error(
-        "Certificate and Private Key PEM file not configured, error: %s.",
-        std::strerror(errno));
+  if (getProperty(property.getName(), certificate) && 
getProperty(privKey.getName(), private_key_)) {
+    logger_->log_error("Certificate and Private Key PEM file not configured, 
error: %s.", std::strerror(errno));
 
     std::ifstream cert_file(certificate);
     std::ifstream priv_file(private_key_);
@@ -168,17 +157,14 @@ void SSLContextService::onEnable() {
     if (passphrase_file.good()) {
       passphrase_file_ = passphrase_;
       // we should read it from the file
-      passphrase_.assign((std::istreambuf_iterator<char>(passphrase_file)),
-                         std::istreambuf_iterator<char>());
+      passphrase_.assign((std::istreambuf_iterator<char>(passphrase_file)), 
std::istreambuf_iterator<char>());
     } else {
       std::string test_passphrase = default_dir + passphrase_;
       std::ifstream passphrase_file_test(test_passphrase);
       if (passphrase_file_test.good()) {
         passphrase_ = test_passphrase;
         passphrase_file_ = test_passphrase;
-        passphrase_.assign(
-            (std::istreambuf_iterator<char>(passphrase_file_test)),
-            std::istreambuf_iterator<char>());
+        
passphrase_.assign((std::istreambuf_iterator<char>(passphrase_file_test)), 
std::istreambuf_iterator<char>());
       } else {
         valid_ = false;
       }
@@ -208,8 +194,7 @@ void SSLContextService::onEnable() {
 void SSLContextService::initializeTLS() {
   core::Property property("Client Certificate", "Client Certificate");
   core::Property privKey("Private Key", "Private Key file");
-  core::Property passphrase_prop(
-      "Passphrase", "Client passphrase. Either a file or unencrypted text");
+  core::Property passphrase_prop("Passphrase", "Client passphrase. Either a 
file or unencrypted text");
   core::Property caCert("CA Certificate", "CA certificate file");
   std::set<core::Property> supportedProperties;
   supportedProperties.insert(property);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ClassLoader.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ClassLoader.cpp 
b/libminifi/src/core/ClassLoader.cpp
index 72ac80c..9bead0e 100644
--- a/libminifi/src/core/ClassLoader.cpp
+++ b/libminifi/src/core/ClassLoader.cpp
@@ -28,7 +28,9 @@ namespace nifi {
 namespace minifi {
 namespace core {
 
-ClassLoader::ClassLoader() : 
logger_(logging::LoggerFactory<ClassLoader>::getLogger()) {}
+ClassLoader::ClassLoader()
+    : logger_(logging::LoggerFactory<ClassLoader>::getLogger()) {
+}
 
 ClassLoader &ClassLoader::getDefaultClassLoader() {
   static ClassLoader ret;
@@ -49,8 +51,7 @@ uint16_t ClassLoader::registerResource(const std::string 
&resource) {
   dlerror();
 
   // load the symbols
-  createFactory* create_factory_func = reinterpret_cast<createFactory*>(dlsym(
-      resource_ptr, "createFactory"));
+  createFactory* create_factory_func = 
reinterpret_cast<createFactory*>(dlsym(resource_ptr, "createFactory"));
   const char* dlsym_error = dlerror();
   if (dlsym_error) {
     logger_->log_error("Cannot load library: %s", dlsym_error);
@@ -61,8 +62,7 @@ uint16_t ClassLoader::registerResource(const std::string 
&resource) {
 
   std::lock_guard<std::mutex> lock(internal_mutex_);
 
-  loaded_factories_[factory->getClassName()] = std::unique_ptr<ObjectFactory>(
-      factory);
+  loaded_factories_[factory->getClassName()] = 
std::unique_ptr<ObjectFactory>(factory);
 
   return RESOURCE_SUCCESS;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ConfigurableComponent.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurableComponent.cpp 
b/libminifi/src/core/ConfigurableComponent.cpp
index 94fa599..f5247ac 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -33,8 +33,7 @@ ConfigurableComponent::ConfigurableComponent()
     : logger_(logging::LoggerFactory<ConfigurableComponent>::getLogger()) {
 }
 
-ConfigurableComponent::ConfigurableComponent(
-    const ConfigurableComponent &&other)
+ConfigurableComponent::ConfigurableComponent(const ConfigurableComponent 
&&other)
     : properties_(std::move(other.properties_)),
       logger_(logging::LoggerFactory<ConfigurableComponent>::getLogger()) {
 }
@@ -42,8 +41,7 @@ ConfigurableComponent::ConfigurableComponent(
 ConfigurableComponent::~ConfigurableComponent() {
 }
 
-bool ConfigurableComponent::getProperty(const std::string &name,
-                                        Property &prop) {
+bool ConfigurableComponent::getProperty(const std::string &name, Property 
&prop) {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
 
   auto &&it = properties_.find(name);
@@ -62,16 +60,14 @@ bool ConfigurableComponent::getProperty(const std::string 
&name,
  * @param value value passed in by reference
  * @return result of getting property.
  */
-bool ConfigurableComponent::getProperty(const std::string name,
-                                        std::string &value) {
+bool ConfigurableComponent::getProperty(const std::string name, std::string 
&value) {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
 
   auto &&it = properties_.find(name);
   if (it != properties_.end()) {
     Property item = it->second;
     value = item.getValue();
-    logger_->log_info("Processor %s property name %s value %s", name,
-                         item.getName(), value);
+    logger_->log_info("Processor %s property name %s value %s", name, 
item.getName(), value);
     return true;
   } else {
     return false;
@@ -83,8 +79,7 @@ bool ConfigurableComponent::getProperty(const std::string 
name,
  * @param value property value.
  * @return result of setting property.
  */
-bool ConfigurableComponent::setProperty(const std::string name,
-                                        std::string value) {
+bool ConfigurableComponent::setProperty(const std::string name, std::string 
value) {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
   auto &&it = properties_.find(name);
 
@@ -92,8 +87,7 @@ bool ConfigurableComponent::setProperty(const std::string 
name,
     Property item = it->second;
     item.setValue(value);
     properties_[item.getName()] = item;
-    logger_->log_info("Component %s property name %s value %s", name.c_str(),
-                         item.getName().c_str(), value.c_str());
+    logger_->log_info("Component %s property name %s value %s", name.c_str(), 
item.getName().c_str(), value.c_str());
     return true;
   } else {
     return false;
@@ -106,8 +100,7 @@ bool ConfigurableComponent::setProperty(const std::string 
name,
  * @param value property value.
  * @return result of setting property.
  */
-bool ConfigurableComponent::updateProperty(const std::string &name,
-                                           const std::string &value) {
+bool ConfigurableComponent::updateProperty(const std::string &name, const 
std::string &value) {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
   auto &&it = properties_.find(name);
 
@@ -115,8 +108,7 @@ bool ConfigurableComponent::updateProperty(const 
std::string &name,
     Property item = it->second;
     item.addValue(value);
     properties_[item.getName()] = item;
-    logger_->log_info("Component %s property name %s value %s", name.c_str(),
-                         item.getName().c_str(), value.c_str());
+    logger_->log_info("Component %s property name %s value %s", name.c_str(), 
item.getName().c_str(), value.c_str());
     return true;
   } else {
     return false;
@@ -137,14 +129,12 @@ bool ConfigurableComponent::setProperty(Property &prop, 
std::string value) {
     Property item = it->second;
     item.setValue(value);
     properties_[item.getName()] = item;
-    logger_->log_info("property name %s value %s", prop.getName().c_str(),
-                         item.getName().c_str(), value.c_str());
+    logger_->log_info("property name %s value %s", prop.getName().c_str(), 
item.getName().c_str(), value.c_str());
     return true;
   } else {
     Property newProp(prop);
     newProp.setValue(value);
-    properties_.insert(
-        std::pair<std::string, Property>(prop.getName(), newProp));
+    properties_.insert(std::pair<std::string, Property>(prop.getName(), 
newProp));
     return true;
   }
   return false;
@@ -155,8 +145,7 @@ bool ConfigurableComponent::setProperty(Property &prop, 
std::string value) {
  * @param supported properties
  * @return result of set operation.
  */
-bool ConfigurableComponent::setSupportedProperties(
-    std::set<Property> properties) {
+bool ConfigurableComponent::setSupportedProperties(std::set<Property> 
properties) {
   if (!canEdit()) {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ConfigurationFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ConfigurationFactory.cpp 
b/libminifi/src/core/ConfigurationFactory.cpp
index 6aa42e3..ea2ed5c 100644
--- a/libminifi/src/core/ConfigurationFactory.cpp
+++ b/libminifi/src/core/ConfigurationFactory.cpp
@@ -39,50 +39,34 @@ namespace core {
 class YamlConfiguration;
 #endif
 
-std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(
-    std::shared_ptr<core::Repository> repo,
-    std::shared_ptr<core::Repository> flow_file_repo,
-    std::shared_ptr<Configure> configure,
-    std::shared_ptr<io::StreamFactory> stream_factory,
-    const std::string configuration_class_name, const std::string path,
-    bool fail_safe) {
-
+std::unique_ptr<core::FlowConfiguration> 
createFlowConfiguration(std::shared_ptr<core::Repository> repo, 
std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> 
configure,
+                                                                 
std::shared_ptr<io::StreamFactory> stream_factory, const std::string 
configuration_class_name, const std::string path,
+                                                                 bool 
fail_safe) {
   std::string class_name_lc = configuration_class_name;
-  std::transform(class_name_lc.begin(), class_name_lc.end(),
-                 class_name_lc.begin(), ::tolower);
+  std::transform(class_name_lc.begin(), class_name_lc.end(), 
class_name_lc.begin(), ::tolower);
   try {
     if (class_name_lc == "flowconfiguration") {
       // load the base configuration.
-      return std::unique_ptr<core::FlowConfiguration>(
-          new core::FlowConfiguration(repo, flow_file_repo, stream_factory,
-                                      configure, path));
+      return std::unique_ptr<core::FlowConfiguration>(new 
core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path));
 
     } else if (class_name_lc == "yamlconfiguration") {
       // only load if the class is defined.
-      return std::unique_ptr<core::FlowConfiguration>(
-          instantiate<core::YamlConfiguration>(repo, flow_file_repo,
-                                               stream_factory, configure, 
path));
+      return 
std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo,
 flow_file_repo, stream_factory, configure, path));
 
     } else {
       if (fail_safe) {
-        return std::unique_ptr<core::FlowConfiguration>(
-            new core::FlowConfiguration(repo, flow_file_repo, stream_factory,
-                                        configure, path));
+        return std::unique_ptr<core::FlowConfiguration>(new 
core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path));
       } else {
-        throw std::runtime_error(
-            "Support for the provided configuration class could not be found");
+        throw std::runtime_error("Support for the provided configuration class 
could not be found");
       }
     }
   } catch (const std::runtime_error &r) {
     if (fail_safe) {
-      return std::unique_ptr<core::FlowConfiguration>(
-          new core::FlowConfiguration(repo, flow_file_repo, stream_factory,
-                                      configure, path));
+      return std::unique_ptr<core::FlowConfiguration>(new 
core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path));
     }
   }
 
-  throw std::runtime_error(
-      "Support for the provided configuration class could not be found");
+  throw std::runtime_error("Support for the provided configuration class could 
not be found");
 }
 
 } /* namespace core */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/Connectable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Connectable.cpp 
b/libminifi/src/core/Connectable.cpp
index 5bd32e9..cf01f0c 100644
--- a/libminifi/src/core/Connectable.cpp
+++ b/libminifi/src/core/Connectable.cpp
@@ -47,12 +47,9 @@ Connectable::Connectable(const Connectable &&other)
 Connectable::~Connectable() {
 }
 
-bool Connectable::setSupportedRelationships(
-    std::set<core::Relationship> relationships) {
+bool Connectable::setSupportedRelationships(std::set<core::Relationship> 
relationships) {
   if (isRunning()) {
-    logger_->log_info(
-        "Can not set processor supported relationship while the process %s is 
running",
-        name_.c_str());
+    logger_->log_info("Can not set processor supported relationship while the 
process %s is running", name_.c_str());
     return false;
   }
 
@@ -61,8 +58,7 @@ bool Connectable::setSupportedRelationships(
   relationships_.clear();
   for (auto item : relationships) {
     relationships_[item.getName()] = item;
-    logger_->log_info("Processor %s supported relationship name %s",
-                      name_.c_str(), item.getName().c_str());
+    logger_->log_info("Processor %s supported relationship name %s", 
name_.c_str(), item.getName().c_str());
   }
   return true;
 }
@@ -71,10 +67,7 @@ bool Connectable::setSupportedRelationships(
 bool Connectable::isSupportedRelationship(core::Relationship relationship) {
   const bool requiresLock = isRunning();
 
-  const auto conditionalLock =
-      !requiresLock ?
-          std::unique_lock<std::mutex>() :
-          std::unique_lock<std::mutex>(relationship_mutex_);
+  const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() 
: std::unique_lock<std::mutex>(relationship_mutex_);
 
   const auto &it = relationships_.find(relationship.getName());
   if (it != relationships_.end()) {
@@ -84,12 +77,9 @@ bool Connectable::isSupportedRelationship(core::Relationship 
relationship) {
   }
 }
 
-bool Connectable::setAutoTerminatedRelationships(
-    std::set<Relationship> relationships) {
+bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> 
relationships) {
   if (isRunning()) {
-    logger_->log_info(
-        "Can not set processor auto terminated relationship while the process 
%s is running",
-        name_.c_str());
+    logger_->log_info("Can not set processor auto terminated relationship 
while the process %s is running", name_.c_str());
     return false;
   }
 
@@ -98,8 +88,7 @@ bool Connectable::setAutoTerminatedRelationships(
   auto_terminated_relationships_.clear();
   for (auto item : relationships) {
     auto_terminated_relationships_[item.getName()] = item;
-    logger_->log_info("Processor %s auto terminated relationship name %s",
-                      name_.c_str(), item.getName().c_str());
+    logger_->log_info("Processor %s auto terminated relationship name %s", 
name_.c_str(), item.getName().c_str());
   }
   return true;
 }
@@ -108,10 +97,7 @@ bool Connectable::setAutoTerminatedRelationships(
 bool Connectable::isAutoTerminated(core::Relationship relationship) {
   const bool requiresLock = isRunning();
 
-  const auto conditionalLock =
-      !requiresLock ?
-          std::unique_lock<std::mutex>() :
-          std::unique_lock<std::mutex>(relationship_mutex_);
+  const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() 
: std::unique_lock<std::mutex>(relationship_mutex_);
 
   const auto &it = auto_terminated_relationships_.find(relationship.getName());
   if (it != auto_terminated_relationships_.end()) {
@@ -126,8 +112,7 @@ void Connectable::waitForWork(uint64_t timeoutMs) {
 
   if (!has_work_.load()) {
     std::unique_lock<std::mutex> lock(work_available_mutex_);
-    work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs),
-                             [&] {return has_work_.load();});
+    work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] 
{return has_work_.load();});
   }
 }
 
@@ -146,8 +131,7 @@ void Connectable::notifyWork() {
   }
 }
 
-std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(
-    std::string relationship) {
+std::set<std::shared_ptr<Connectable>> 
Connectable::getOutGoingConnections(std::string relationship) {
   std::set<std::shared_ptr<Connectable>> empty;
 
   auto &&it = out_going_connections_.find(relationship);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/FlowConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowConfiguration.cpp 
b/libminifi/src/core/FlowConfiguration.cpp
index 1ed9176..6635701 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -30,14 +30,12 @@ namespace core {
 FlowConfiguration::~FlowConfiguration() {
 }
 
-std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(
-    std::string name, uuid_t uuid) {
+std::shared_ptr<core::Processor> 
FlowConfiguration::createProcessor(std::string name, uuid_t uuid) {
   auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, 
uuid);
   if (nullptr == ptr) {
     logger_->log_error("No Processor defined for %s", name.c_str());
   }
-  std::shared_ptr<core::Processor> processor = std::static_pointer_cast<
-      core::Processor>(ptr);
+  std::shared_ptr<core::Processor> processor = 
std::static_pointer_cast<core::Processor>(ptr);
 
   // initialize the processor
   processor->initialize();
@@ -48,37 +46,27 @@ std::shared_ptr<core::Processor> 
FlowConfiguration::createProcessor(
 std::shared_ptr<core::Processor> 
FlowConfiguration::createProvenanceReportTask() {
   std::shared_ptr<core::Processor> processor = nullptr;
 
-  processor =
-      std::make_shared<
-          
org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(
-          stream_factory_);
+  processor = 
std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(stream_factory_);
   // initialize the processor
   processor->initialize();
 
   return processor;
 }
 
-std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(
-    std::string name, uuid_t uuid) {
-  return std::unique_ptr<core::ProcessGroup>(
-      new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid));
+std::unique_ptr<core::ProcessGroup> 
FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid) {
+  return std::unique_ptr<core::ProcessGroup>(new 
core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid));
 }
 
-std::unique_ptr<core::ProcessGroup> 
FlowConfiguration::createRemoteProcessGroup(
-    std::string name, uuid_t uuid) {
-  return std::unique_ptr<core::ProcessGroup>(
-      new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
+std::unique_ptr<core::ProcessGroup> 
FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) {
+  return std::unique_ptr<core::ProcessGroup>(new 
core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid));
 }
 
-std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(
-    std::string name, uuid_t uuid) {
+std::shared_ptr<minifi::Connection> 
FlowConfiguration::createConnection(std::string name, uuid_t uuid) {
   return std::make_shared<minifi::Connection>(flow_file_repo_, name, uuid);
 }
 
-std::shared_ptr<core::controller::ControllerServiceNode> 
FlowConfiguration::createControllerService(
-    const std::string &class_name, const std::string &name, uuid_t uuid) {
-  std::shared_ptr<core::controller::ControllerServiceNode> 
controllerServicesNode =
-      service_provider_->createControllerService(class_name, name, true);
+std::shared_ptr<core::controller::ControllerServiceNode> 
FlowConfiguration::createControllerService(const std::string &class_name, const 
std::string &name, uuid_t uuid) {
+  std::shared_ptr<core::controller::ControllerServiceNode> 
controllerServicesNode = service_provider_->createControllerService(class_name, 
name, true);
   if (nullptr != controllerServicesNode)
     controllerServicesNode->setUUID(uuid);
   return controllerServicesNode;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/FlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp
index 8c693bf..af73b91 100644
--- a/libminifi/src/core/FlowFile.cpp
+++ b/libminifi/src/core/FlowFile.cpp
@@ -178,8 +178,7 @@ void FlowFile::setLineageStartDate(const uint64_t date) {
  * Sets the original connection with a shared pointer.
  * @param connection shared connection.
  */
-void FlowFile::setOriginalConnection(
-    std::shared_ptr<core::Connectable> &connection) {
+void FlowFile::setOriginalConnection(std::shared_ptr<core::Connectable> 
&connection) {
   original_connection_ = connection;
 }
 

Reply via email to