http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 614ee28..0ee33b3 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -39,10 +39,7 @@ namespace minifi { std::atomic<uint64_t> FlowFileRecord::local_flow_seq_number_(0); -FlowFileRecord::FlowFileRecord( - std::shared_ptr<core::Repository> flow_repository, - std::map<std::string, std::string> attributes, - std::shared_ptr<ResourceClaim> claim) +FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::map<std::string, std::string> attributes, std::shared_ptr<ResourceClaim> claim) : FlowFile(), flow_repository_(flow_repository), logger_(logging::LoggerFactory<FlowFileRecord>::getLogger()) { @@ -68,9 +65,7 @@ FlowFileRecord::FlowFileRecord( claim_->increaseFlowFileRecordOwnedCount(); } -FlowFileRecord::FlowFileRecord( - std::shared_ptr<core::Repository> flow_repository, - std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection) +FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::FlowFile> &event, const std::string &uuidConnection) : FlowFile(), snapshot_(""), flow_repository_(flow_repository), @@ -89,9 +84,7 @@ FlowFileRecord::FlowFileRecord( } } -FlowFileRecord::FlowFileRecord( - std::shared_ptr<core::Repository> flow_repository, - std::shared_ptr<core::FlowFile> &event) +FlowFileRecord::FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, std::shared_ptr<core::FlowFile> &event) : FlowFile(), uuid_connection_(""), snapshot_(""), @@ -109,8 +102,7 @@ FlowFileRecord::~FlowFileRecord() { claim_->decreaseFlowFileRecordOwnedCount(); std::string value; if (claim_->getFlowFileRecordOwnedCount() <= 0) { - logger_->log_debug("Delete Resource Claim %s", - claim_->getContentFullPath().c_str()); + logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath().c_str()); if (!this->stored || !flow_repository_->Get(uuid_str_, value)) { std::remove(claim_->getContentFullPath().c_str()); } @@ -138,8 +130,7 @@ bool FlowFileRecord::removeKeyedAttribute(FlowAttribute key) { } } -bool FlowFileRecord::updateKeyedAttribute(FlowAttribute key, - std::string value) { +bool FlowFileRecord::updateKeyedAttribute(FlowAttribute key, std::string value) { const char *keyStr = FlowAttributeKey(key); if (keyStr) { std::string keyString = keyStr; @@ -174,25 +165,19 @@ bool FlowFileRecord::DeSerialize(std::string key) { ret = flow_repository_->Get(key, value); if (!ret) { - logger_->log_error("NiFi FlowFile Store event %s can not found", - key.c_str()); + logger_->log_error("NiFi FlowFile Store event %s can not found", key.c_str()); return false; } else { - logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(), - value.length()); + logger_->log_debug("NiFi FlowFile Read event %s length %d", key.c_str(), value.length()); } io::DataStream stream((const uint8_t*) value.data(), value.length()); ret = DeSerialize(stream); if (ret) { - logger_->log_debug( - "NiFi FlowFile retrieve uuid %s size %d connection %s success", - uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str()); + logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %s success", uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str()); } else { - logger_->log_debug( - "NiFi FlowFile retrieve uuid %s size %d connection %d fail", - uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str()); + logger_->log_debug("NiFi FlowFile retrieve uuid %s size %d connection %d fail", uuid_str_.c_str(), stream.getSize(), uuid_connection_.c_str()); } return ret; @@ -260,15 +245,11 @@ bool FlowFileRecord::Serialize() { return false; } - if (flow_repository_->Put(uuid_str_, - const_cast<uint8_t*>(outStream.getBuffer()), - outStream.getSize())) { - logger_->log_debug("NiFi FlowFile Store event %s size %d success", - uuid_str_.c_str(), outStream.getSize()); + if (flow_repository_->Put(uuid_str_, const_cast<uint8_t*>(outStream.getBuffer()), outStream.getSize())) { + logger_->log_debug("NiFi FlowFile Store event %s size %d success", uuid_str_.c_str(), outStream.getSize()); return true; } else { - logger_->log_error("NiFi FlowFile Store event %s size %d fail", - uuid_str_.c_str(), outStream.getSize()); + logger_->log_error("NiFi FlowFile Store event %s size %d fail", uuid_str_.c_str(), outStream.getSize()); return false; }
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/Properties.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp index f877e7a..abebfbb 100644 --- a/libminifi/src/Properties.cpp +++ b/libminifi/src/Properties.cpp @@ -28,7 +28,9 @@ namespace minifi { #define BUFFER_SIZE 512 -Properties::Properties() : logger_(logging::LoggerFactory<Properties>::getLogger()) {} +Properties::Properties() + : logger_(logging::LoggerFactory<Properties>::getLogger()) { +} // Get the config value bool Properties::get(std::string key, std::string &value) { @@ -62,8 +64,7 @@ void Properties::parseConfigureFileLine(char *buf) { ++line; char first = line[0]; - if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') - || (first == '=')) { + if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) { return; } @@ -96,8 +97,7 @@ void Properties::loadConfigureFile(const char *fileName) { if (fileName) { // perform a naive determination if this is a relative path if (fileName[0] != '/') { - adjustedFilename = adjustedFilename + getHome() + "/" - + fileName; + adjustedFilename = adjustedFilename + getHome() + "/" + fileName; } else { adjustedFilename += fileName; } @@ -115,8 +115,7 @@ void Properties::loadConfigureFile(const char *fileName) { this->clear(); char buf[BUFFER_SIZE]; - for (file.getline(buf, BUFFER_SIZE); file.good(); - file.getline(buf, BUFFER_SIZE)) { + for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) { parseConfigureFileLine(buf); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 2e7e61a..ca8d3be 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -43,39 +43,29 @@ namespace apache { namespace nifi { namespace minifi { -const std::string RemoteProcessorGroupPort::ProcessorName( - "RemoteProcessorGroupPort"); -core::Property RemoteProcessorGroupPort::hostName("Host Name", - "Remote Host Name.", - "localhost"); +const char *RemoteProcessorGroupPort::ProcessorName("RemoteProcessorGroupPort"); +core::Property RemoteProcessorGroupPort::hostName("Host Name", "Remote Host Name.", "localhost"); core::Property RemoteProcessorGroupPort::port("Port", "Remote Port", "9999"); -core::Property RemoteProcessorGroupPort::portUUID( - "Port UUID", "Specifies remote NiFi Port UUID.", ""); +core::Property RemoteProcessorGroupPort::portUUID("Port UUID", "Specifies remote NiFi Port UUID.", ""); core::Relationship RemoteProcessorGroupPort::relation; std::unique_ptr<Site2SiteClientProtocol> RemoteProcessorGroupPort::getNextProtocol( - bool create = true) { +bool create = true) { std::unique_ptr<Site2SiteClientProtocol> nextProtocol = nullptr; if (!available_protocols_.try_dequeue(nextProtocol)) { if (create) { // create - nextProtocol = std::unique_ptr<Site2SiteClientProtocol>( - new Site2SiteClientProtocol(nullptr)); + nextProtocol = std::unique_ptr<Site2SiteClientProtocol>(new Site2SiteClientProtocol(nullptr)); nextProtocol->setPortId(protocol_uuid_); - std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = - std::unique_ptr<org::apache::nifi::minifi::io::DataStream>( - stream_factory_->createSocket(host_, port_)); - std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>( - new Site2SitePeer(std::move(str), host_, port_)); + std::unique_ptr<org::apache::nifi::minifi::io::DataStream> str = std::unique_ptr<org::apache::nifi::minifi::io::DataStream>(stream_factory_->createSocket(host_, port_)); + std::unique_ptr<Site2SitePeer> peer_ = std::unique_ptr<Site2SitePeer>(new Site2SitePeer(std::move(str), host_, port_)); nextProtocol->setPeer(std::move(peer_)); } } return std::move(nextProtocol); } -void RemoteProcessorGroupPort::returnProtocol( - std::unique_ptr<Site2SiteClientProtocol> return_protocol) { - +void RemoteProcessorGroupPort::returnProtocol(std::unique_ptr<Site2SiteClientProtocol> return_protocol) { if (available_protocols_.size_approx() >= max_concurrent_tasks_) { // let the memory be freed return; @@ -96,9 +86,7 @@ void RemoteProcessorGroupPort::initialize() { setSupportedRelationships(relationships); } -void RemoteProcessorGroupPort::onSchedule( - core::ProcessContext *context, - core::ProcessSessionFactory *sessionFactory) { +void RemoteProcessorGroupPort::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { std::string value; int64_t lvalue; @@ -106,8 +94,7 @@ void RemoteProcessorGroupPort::onSchedule( if (context->getProperty(hostName.getName(), value)) { host_ = value; } - if (context->getProperty(port.getName(), value) - && core::Property::StringToInt(value, lvalue)) { + if (context->getProperty(port.getName(), value) && core::Property::StringToInt(value, lvalue)) { port_ = (uint16_t) lvalue; } if (context->getProperty(portUUID.getName(), value)) { @@ -115,8 +102,7 @@ void RemoteProcessorGroupPort::onSchedule( } } -void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, - core::ProcessSession *session) { +void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { if (!transmitting_) return; @@ -130,8 +116,7 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, if (context->getProperty(hostName.getName(), value)) { host = value; } - if (context->getProperty(port.getName(), value) - && core::Property::StringToInt(value, lvalue)) { + if (context->getProperty(port.getName(), value) && core::Property::StringToInt(value, lvalue)) { sport = (uint16_t) lvalue; } if (context->getProperty(portUUID.getName(), value)) { @@ -148,10 +133,8 @@ void RemoteProcessorGroupPort::onTrigger(core::ProcessContext *context, if (!protocol_->bootstrap()) { // bootstrap the client protocol if needeed context->yield(); - std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>( - context->getProcessorNode().getProcessor()); - logger_->log_error("Site2Site bootstrap failed yield period %d peer ", - processor->getYieldPeriodMsec()); + std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode().getProcessor()); + logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec()); returnProtocol(std::move(protocol_)); return; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index c582c52..24ba146 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -33,18 +33,14 @@ namespace minifi { bool SchedulingAgent::hasWorkToDo(std::shared_ptr<core::Processor> processor) { // Whether it has work to do - if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() - || processor->flowFilesQueued()) + if (processor->getTriggerWhenEmpty() || !processor->hasIncomingConnections() || processor->flowFilesQueued()) return true; else return false; } -void SchedulingAgent::enableControllerService( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - - logger_->log_trace("Enabling CSN in SchedulingAgent %s", - serviceNode->getName()); +void SchedulingAgent::enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { + logger_->log_trace("Enabling CSN in SchedulingAgent %s", serviceNode->getName()); // reference the enable function from serviceNode std::function<bool()> f_ex = [serviceNode] { return serviceNode->enable(); @@ -57,9 +53,7 @@ void SchedulingAgent::enableControllerService( component_lifecycle_thread_pool_.execute(std::move(functor), future); } -void SchedulingAgent::disableControllerService( - std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { - +void SchedulingAgent::disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) { // reference the disable function from serviceNode std::function<bool()> f_ex = [serviceNode] { return serviceNode->disable(); @@ -72,14 +66,11 @@ void SchedulingAgent::disableControllerService( component_lifecycle_thread_pool_.execute(std::move(functor), future); } -bool SchedulingAgent::hasTooMuchOutGoing( - std::shared_ptr<core::Processor> processor) { +bool SchedulingAgent::hasTooMuchOutGoing(std::shared_ptr<core::Processor> processor) { return processor->flowFilesOutGoingFull(); } -bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, - core::ProcessContext *processContext, - core::ProcessSessionFactory *sessionFactory) { +bool SchedulingAgent::onTrigger(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { if (processor->isYield()) return false; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp index 475d49d..08ee665 100644 --- a/libminifi/src/Site2SiteClientProtocol.cpp +++ b/libminifi/src/Site2SiteClientProtocol.cpp @@ -38,8 +38,7 @@ namespace minifi { bool Site2SiteClientProtocol::establish() { if (_peerState != IDLE) { - logger_->log_error( - "Site2Site peer state is not idle while try to establish"); + logger_->log_error("Site2Site peer state is not idle while try to establish"); return false; } @@ -70,14 +69,11 @@ bool Site2SiteClientProtocol::establish() { bool Site2SiteClientProtocol::initiateResourceNegotiation() { // Negotiate the version if (_peerState != IDLE) { - logger_->log_error( - "Site2Site peer state is not idle while initiateResourceNegotiation"); + logger_->log_error("Site2Site peer state is not idle while initiateResourceNegotiation"); return false; } - logger_->log_info( - "Negotiate protocol version with destination port %s current version %d", - _portIdStr.c_str(), _currentVersion); + logger_->log_info("Negotiate protocol version with destination port %s current version %d", _portIdStr.c_str(), _currentVersion); int ret = peer_->writeUTF(this->getResourceName()); @@ -106,39 +102,35 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() { } logger_->log_info("status code is %i", statusCode); switch (statusCode) { - case RESOURCE_OK: - logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { + case RESOURCE_OK: + logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { + // tearDown(); + return false; + } + logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); + for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedVersion[i]) { + _currentVersion = _supportedVersion[i]; + _currentVersionIndex = i; + return initiateResourceNegotiation(); + } + } + ret = -1; // tearDown(); return false; - } - logger_->log_info( - "Site2Site Server Response asked for a different protocol version %d", - serverVersion); - for (unsigned int i = (_currentVersionIndex + 1); - i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { - if (serverVersion >= _supportedVersion[i]) { - _currentVersion = _supportedVersion[i]; - _currentVersionIndex = i; - return initiateResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Negotiate protocol response ABORT"); - ret = -1; - // tearDown(); - return false; - default: - logger_->log_info("Negotiate protocol response unknown code %d", - statusCode); - return true; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Negotiate protocol response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + logger_->log_info("Negotiate protocol response unknown code %d", statusCode); + return true; } return true; @@ -147,14 +139,11 @@ bool Site2SiteClientProtocol::initiateResourceNegotiation() { bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() { // Negotiate the version if (_peerState != HANDSHAKED) { - logger_->log_error( - "Site2Site peer state is not handshaked while initiateCodecResourceNegotiation"); + logger_->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation"); return false; } - logger_->log_info( - "Negotiate Codec version with destination port %s current version %d", - _portIdStr.c_str(), _currentCodecVersion); + logger_->log_info("Negotiate Codec version with destination port %s current version %d", _portIdStr.c_str(), _currentCodecVersion); int ret = peer_->writeUTF(this->getCodecResourceName()); @@ -181,38 +170,35 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() { } switch (statusCode) { - case RESOURCE_OK: - logger_->log_info("Site2Site Codec Negotiate version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { + case RESOURCE_OK: + logger_->log_info("Site2Site Codec Negotiate version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { + // tearDown(); + return false; + } + logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); + for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedCodecVersion[i]) { + _currentCodecVersion = _supportedCodecVersion[i]; + _currentCodecVersionIndex = i; + return initiateCodecResourceNegotiation(); + } + } + ret = -1; // tearDown(); return false; - } - logger_->log_info( - "Site2Site Server Response asked for a different codec version %d", - serverVersion); - for (unsigned int i = (_currentCodecVersionIndex + 1); - i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { - if (serverVersion >= _supportedCodecVersion[i]) { - _currentCodecVersion = _supportedCodecVersion[i]; - _currentCodecVersionIndex = i; - return initiateCodecResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Codec Negotiate response ABORT"); - ret = -1; - // tearDown(); - return false; - default: - logger_->log_info("Negotiate Codec response unknown code %d", statusCode); - return true; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Codec Negotiate response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + logger_->log_info("Negotiate Codec response unknown code %d", statusCode); + return true; } return true; @@ -220,13 +206,10 @@ bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() { bool Site2SiteClientProtocol::handShake() { if (_peerState != ESTABLISHED) { - logger_->log_error( - "Site2Site peer state is not established while handshake"); + logger_->log_error("Site2Site peer state is not established while handshake"); return false; } - logger_->log_info( - "Site2Site Protocol Perform hand shake with destination port %s", - _portIdStr.c_str()); + logger_->log_info("Site2Site Protocol Perform hand shake with destination port %s", _portIdStr.c_str()); uuid_t uuid; // Generate the global UUID for the com identify uuid_generate(uuid); @@ -244,18 +227,14 @@ bool Site2SiteClientProtocol::handShake() { std::map<std::string, std::string> properties; properties[HandShakePropertyStr[GZIP]] = "false"; properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr; - properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string( - this->_timeOut); + properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut); if (this->_currentVersion >= 5) { if (this->_batchCount > 0) - properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string( - this->_batchCount); + properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(this->_batchCount); if (this->_batchSize > 0) - properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string( - this->_batchSize); + properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(this->_batchSize); if (this->_batchDuration > 0) - properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string( - this->_batchDuration); + properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(this->_batchDuration); } if (_currentVersion >= 3) { @@ -285,8 +264,7 @@ bool Site2SiteClientProtocol::handShake() { // tearDown(); return false; } - logger_->log_info("Site2Site Protocol Send handshake properties %s %s", - it->first.c_str(), it->second.c_str()); + logger_->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str()); } RespondCode code; @@ -307,16 +285,14 @@ bool Site2SiteClientProtocol::handShake() { case PORT_NOT_IN_VALID_STATE: case UNKNOWN_PORT: case PORTS_DESTINATION_FULL: - logger_->log_error( - "Site2Site HandShake Failed because destination port is either invalid or full"); + logger_->log_error("Site2Site HandShake Failed because destination port is either invalid or full"); ret = -1; /* peer_->yield(); tearDown(); */ return false; default: - logger_->log_info("HandShake Failed because of unknown respond code %d", - code); + logger_->log_info("HandShake Failed because of unknown respond code %d", code); ret = -1; /* peer_->yield(); @@ -368,8 +344,7 @@ int Site2SiteClientProtocol::readRequestType(RequestType &type) { return -1; } -int Site2SiteClientProtocol::readRespond(RespondCode &code, - std::string &message) { +int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string &message) { uint8_t firstByte; int ret = peer_->read(firstByte); @@ -407,8 +382,7 @@ int Site2SiteClientProtocol::readRespond(RespondCode &code, return 3 + message.size(); } -int Site2SiteClientProtocol::writeRespond(RespondCode code, - std::string message) { +int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string message) { RespondCodeContext *resCode = this->getRespondCodeContext(code); if (resCode == NULL) { @@ -440,14 +414,11 @@ int Site2SiteClientProtocol::writeRespond(RespondCode code, bool Site2SiteClientProtocol::negotiateCodec() { if (_peerState != HANDSHAKED) { - logger_->log_error( - "Site2Site peer state is not handshaked while negotiate codec"); + logger_->log_error("Site2Site peer state is not handshaked while negotiate codec"); return false; } - logger_->log_info( - "Site2Site Protocol Negotiate Codec with destination port %s", - _portIdStr.c_str()); + logger_->log_info("Site2Site Protocol Negotiate Codec with destination port %s", _portIdStr.c_str()); int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC); @@ -467,8 +438,7 @@ bool Site2SiteClientProtocol::negotiateCodec() { return false; } - logger_->log_info( - "Site2Site Codec Completed and move to READY state for data transfer"); + logger_->log_info("Site2Site Codec Completed and move to READY state for data transfer"); _peerState = READY; return true; @@ -490,8 +460,7 @@ bool Site2SiteClientProtocol::bootstrap() { } } -Transaction* Site2SiteClientProtocol::createTransaction( - std::string &transactionID, TransferDirection direction) { +Transaction* Site2SiteClientProtocol::createTransaction(std::string &transactionID, TransferDirection direction) { int ret; bool dataAvailable; Transaction *transaction = NULL; @@ -522,8 +491,7 @@ Transaction* Site2SiteClientProtocol::createTransaction( return NULL; } - org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream( - peer_.get()); + org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get()); switch (code) { case MORE_DATA: dataAvailable = true; @@ -532,8 +500,7 @@ Transaction* Site2SiteClientProtocol::createTransaction( _transactionMap[transaction->getUUIDStr()] = transaction; transactionID = transaction->getUUIDStr(); transaction->setDataAvailable(dataAvailable); - logger_->log_info("Site2Site create transaction %s", - transaction->getUUIDStr().c_str()); + logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); return transaction; case NO_MORE_DATA: dataAvailable = false; @@ -542,12 +509,10 @@ Transaction* Site2SiteClientProtocol::createTransaction( _transactionMap[transaction->getUUIDStr()] = transaction; transactionID = transaction->getUUIDStr(); transaction->setDataAvailable(dataAvailable); - logger_->log_info("Site2Site create transaction %s", - transaction->getUUIDStr().c_str()); + logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); return transaction; default: - logger_->log_info( - "Site2Site got unexpected response %d when asking for data", code); + logger_->log_info("Site2Site got unexpected response %d when asking for data", code); // tearDown(); return NULL; } @@ -558,20 +523,17 @@ Transaction* Site2SiteClientProtocol::createTransaction( // tearDown(); return NULL; } else { - org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream( - peer_.get()); + org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get()); transaction = new Transaction(direction, crcstream); _transactionMap[transaction->getUUIDStr()] = transaction; transactionID = transaction->getUUIDStr(); - logger_->log_info("Site2Site create transaction %s", - transaction->getUUIDStr().c_str()); + logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); return transaction; } } } -bool Site2SiteClientProtocol::receive(std::string transactionID, - DataPacket *packet, bool &eof) { +bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket *packet, bool &eof) { int ret; Transaction *transaction = NULL; @@ -583,8 +545,7 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, return false; } - std::map<std::string, Transaction *>::iterator it = - this->_transactionMap.find(transactionID); + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); if (it == _transactionMap.end()) { return false; @@ -592,17 +553,13 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, transaction = it->second; } - if (transaction->getState() != TRANSACTION_STARTED - && transaction->getState() != DATA_EXCHANGED) { - logger_->log_info( - "Site2Site transaction %s is not at started or exchanged state", - transactionID.c_str()); + if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) { + logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); return false; } if (transaction->getDirection() != RECEIVE) { - logger_->log_info("Site2Site transaction %s direction is wrong", - transactionID.c_str()); + logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); return false; } @@ -622,19 +579,13 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, return false; } if (code == CONTINUE_TRANSACTION) { - logger_->log_info( - "Site2Site transaction %s peer indicate continue transaction", - transactionID.c_str()); + logger_->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str()); transaction->_dataAvailable = true; } else if (code == FINISH_TRANSACTION) { - logger_->log_info( - "Site2Site transaction %s peer indicate finish transaction", - transactionID.c_str()); + logger_->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str()); transaction->_dataAvailable = false; } else { - logger_->log_info( - "Site2Site transaction %s peer indicate wrong respond code %d", - transactionID.c_str(), code); + logger_->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code); return false; } } @@ -664,9 +615,7 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, return false; } packet->_attributes[key] = value; - logger_->log_info( - "Site2Site transaction %s receives attribute key %s value %s", - transactionID.c_str(), key.c_str(), value.c_str()); + logger_->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str()); } uint64_t len; @@ -679,17 +628,12 @@ bool Site2SiteClientProtocol::receive(std::string transactionID, transaction->_transfers++; transaction->_state = DATA_EXCHANGED; transaction->_bytes += len; - logger_->log_info( - "Site2Site transaction %s receives flow record %d, total length %d", - transactionID.c_str(), transaction->_transfers, transaction->_bytes); + logger_->log_info("Site2Site transaction %s receives flow record %d, total length %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes); return true; } -bool Site2SiteClientProtocol::send(std::string transactionID, - DataPacket *packet, - std::shared_ptr<FlowFileRecord> flowFile, - core::ProcessSession *session) { +bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, core::ProcessSession *session) { int ret; Transaction *transaction = NULL; @@ -701,8 +645,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID, return false; } - std::map<std::string, Transaction *>::iterator it = - this->_transactionMap.find(transactionID); + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); if (it == _transactionMap.end()) { return false; @@ -710,17 +653,13 @@ bool Site2SiteClientProtocol::send(std::string transactionID, transaction = it->second; } - if (transaction->getState() != TRANSACTION_STARTED - && transaction->getState() != DATA_EXCHANGED) { - logger_->log_info( - "Site2Site transaction %s is not at started or exchanged state", - transactionID.c_str()); + if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) { + logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); return false; } if (transaction->getDirection() != SEND) { - logger_->log_info("Site2Site transaction %s direction is wrong", - transactionID.c_str()); + logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); return false; } @@ -739,8 +678,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID, } std::map<std::string, std::string>::iterator itAttribute; - for (itAttribute = packet->_attributes.begin(); - itAttribute != packet->_attributes.end(); itAttribute++) { + for (itAttribute = packet->_attributes.begin(); itAttribute != packet->_attributes.end(); itAttribute++) { ret = transaction->getStream().writeUTF(itAttribute->first, true); if (ret <= 0) { @@ -750,9 +688,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID, if (ret <= 0) { return false; } - logger_->log_info("Site2Site transaction %s send attribute key %s value %s", - transactionID.c_str(), itAttribute->first.c_str(), - itAttribute->second.c_str()); + logger_->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), itAttribute->first.c_str(), itAttribute->second.c_str()); } uint64_t len = 0; @@ -777,8 +713,7 @@ bool Site2SiteClientProtocol::send(std::string transactionID, return false; } - ret = transaction->getStream().writeData( - reinterpret_cast<uint8_t *> (const_cast<char*> (packet->payload_.c_str())), len); + ret = transaction->getStream().writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(packet->payload_.c_str())), len); if (ret != len) { return false; } @@ -788,15 +723,12 @@ bool Site2SiteClientProtocol::send(std::string transactionID, transaction->_transfers++; transaction->_state = DATA_EXCHANGED; transaction->_bytes += len; - logger_->log_info( - "Site2Site transaction %s send flow record %d, total length %d", - transactionID.c_str(), transaction->_transfers, transaction->_bytes); + logger_->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes); return true; } -void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, - core::ProcessSession *session) { +void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, core::ProcessSession *session) { uint64_t bytes = 0; int transfers = 0; Transaction *transaction = NULL; @@ -808,8 +740,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, if (_peerState != READY) { context->yield(); tearDown(); - throw Exception(SITE2SITE_EXCEPTION, - "Can not establish handshake with peer"); + throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); return; } @@ -840,8 +771,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, // transaction done break; } - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast< - FlowFileRecord>(session->create()); + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); if (!flowFile) { throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); @@ -849,8 +779,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, } std::map<std::string, std::string>::iterator it; std::string sourceIdentifier; - for (it = packet._attributes.begin(); it != packet._attributes.end(); - it++) { + for (it = packet._attributes.begin(); it != packet._attributes.end(); it++) { if (it->first == FlowAttributeKey(UUID)) sourceIdentifier = it->second; flowFile->addAttribute(it->first, it->second); @@ -867,11 +796,8 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, core::Relationship relation; // undefined relationship uint64_t endTime = getTimeMillis(); std::string transitUri = peer_->getURL() + "/" + sourceIdentifier; - std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" - + peer_->getHostName(); - session->getProvenanceReporter()->receive(flowFile, transitUri, - sourceIdentifier, details, - endTime - startTime); + std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + peer_->getHostName(); + session->getProvenanceReporter()->receive(flowFile, transitUri, sourceIdentifier, details, endTime - startTime); session->transfer(flowFile, relation); // receive the transfer for the flow record bytes += packet._size; @@ -886,9 +812,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed"); return; } - logger_->log_info( - "Site2Site transaction %s successfully receive flow record %d, content bytes %d", - transactionID.c_str(), transfers, bytes); + logger_->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", transactionID.c_str(), transfers, bytes); // we yield the receive if we did not get anything if (transfers == 0) context->yield(); @@ -904,8 +828,7 @@ void Site2SiteClientProtocol::receiveFlowFiles(core::ProcessContext *context, deleteTransaction(transactionID); context->yield(); tearDown(); - logger_->log_debug( - "Caught Exception during Site2SiteClientProtocol::receiveFlowFiles"); + logger_->log_debug("Caught Exception during Site2SiteClientProtocol::receiveFlowFiles"); throw; } @@ -926,8 +849,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { return false; } - std::map<std::string, Transaction *>::iterator it = - this->_transactionMap.find(transactionID); + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); if (it == _transactionMap.end()) { return false; @@ -935,9 +857,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { transaction = it->second; } - if (transaction->getState() == TRANSACTION_STARTED - && !transaction->isDataAvailable() - && transaction->getDirection() == RECEIVE) { + if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() && transaction->getDirection() == RECEIVE) { transaction->_state = TRANSACTION_CONFIRMED; return true; } @@ -957,8 +877,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. int64_t crcValue = transaction->getCRC(); std::string crc = std::to_string(crcValue); - logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", - transaction->getCRC(), transactionID.c_str()); + logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), transactionID.c_str()); ret = writeRespond(CONFIRM_TRANSACTION, crc); if (ret <= 0) return false; @@ -969,25 +888,21 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { return false; if (code == CONFIRM_TRANSACTION) { - logger_->log_info("Site2Site transaction %s peer confirm transaction", - transactionID.c_str()); + logger_->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str()); transaction->_state = TRANSACTION_CONFIRMED; return true; } else if (code == BAD_CHECKSUM) { - logger_->log_info("Site2Site transaction %s peer indicate bad checksum", - transactionID.c_str()); + logger_->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str()); /* transaction->_state = TRANSACTION_CONFIRMED; return true; */ return false; } else { - logger_->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); + logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code); return false; } } else { - logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", - transactionID.c_str()); + logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", transactionID.c_str()); ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION"); if (ret <= 0) return false; @@ -999,23 +914,19 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response if (code == CONFIRM_TRANSACTION) { - logger_->log_info( - "Site2Site transaction %s peer confirm transaction with CRC %s", - transactionID.c_str(), message.c_str()); + logger_->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str()); if (this->_currentVersion > 3) { int64_t crcValue = transaction->getCRC(); std::string crc = std::to_string(crcValue); if (message == crc) { - logger_->log_info("Site2Site transaction %s CRC matched", - transactionID.c_str()); + logger_->log_info("Site2Site transaction %s CRC matched", transactionID.c_str()); ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); if (ret <= 0) return false; transaction->_state = TRANSACTION_CONFIRMED; return true; } else { - logger_->log_info("Site2Site transaction %s CRC not matched %s", - transactionID.c_str(), crc.c_str()); + logger_->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str()); ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM"); /* ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); @@ -1032,8 +943,7 @@ bool Site2SiteClientProtocol::confirm(std::string transactionID) { transaction->_state = TRANSACTION_CONFIRMED; return true; } else { - logger_->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); + logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code); return false; } return false; @@ -1047,8 +957,7 @@ void Site2SiteClientProtocol::cancel(std::string transactionID) { return; } - std::map<std::string, Transaction *>::iterator it = - this->_transactionMap.find(transactionID); + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); if (it == _transactionMap.end()) { return; @@ -1056,9 +965,7 @@ void Site2SiteClientProtocol::cancel(std::string transactionID) { transaction = it->second; } - if (transaction->getState() == TRANSACTION_CANCELED - || transaction->getState() == TRANSACTION_COMPLETED - || transaction->getState() == TRANSACTION_ERROR) { + if (transaction->getState() == TRANSACTION_CANCELED || transaction->getState() == TRANSACTION_COMPLETED || transaction->getState() == TRANSACTION_ERROR) { return; } @@ -1072,8 +979,7 @@ void Site2SiteClientProtocol::cancel(std::string transactionID) { void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) { Transaction *transaction = NULL; - std::map<std::string, Transaction *>::iterator it = - this->_transactionMap.find(transactionID); + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); if (it == _transactionMap.end()) { return; @@ -1081,8 +987,7 @@ void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) { transaction = it->second; } - logger_->log_info("Site2Site delete transaction %s", - transaction->getUUIDStr().c_str()); + logger_->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str()); delete transaction; _transactionMap.erase(transactionID); } @@ -1090,8 +995,7 @@ void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) { void Site2SiteClientProtocol::error(std::string transactionID) { Transaction *transaction = NULL; - std::map<std::string, Transaction *>::iterator it = - this->_transactionMap.find(transactionID); + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); if (it == _transactionMap.end()) { return; @@ -1117,8 +1021,7 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) { return false; } - std::map<std::string, Transaction *>::iterator it = - this->_transactionMap.find(transactionID); + std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); if (it == _transactionMap.end()) { return false; @@ -1135,8 +1038,7 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) { transaction->_state = TRANSACTION_COMPLETED; return true; } else { - logger_->log_info("Site2Site transaction %s send finished", - transactionID.c_str()); + logger_->log_info("Site2Site transaction %s send finished", transactionID.c_str()); ret = this->writeRespond(TRANSACTION_FINISHED, "Finished"); if (ret <= 0) { return false; @@ -1156,22 +1058,18 @@ bool Site2SiteClientProtocol::complete(std::string transactionID) { return false; if (code == TRANSACTION_FINISHED) { - logger_->log_info("Site2Site transaction %s peer finished transaction", - transactionID.c_str()); + logger_->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str()); transaction->_state = TRANSACTION_COMPLETED; return true; } else { - logger_->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); + logger_->log_info("Site2Site transaction %s peer unknown respond code %d", transactionID.c_str(), code); return false; } } } -void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, - core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flow = - std::static_pointer_cast<FlowFileRecord>(session->get()); +void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, core::ProcessSession *session) { + std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast<FlowFileRecord>(session->get()); Transaction *transaction = NULL; @@ -1185,8 +1083,7 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, if (_peerState != READY) { context->yield(); tearDown(); - throw Exception(SITE2SITE_EXCEPTION, - "Can not establish handshake with peer"); + throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); return; } @@ -1214,14 +1111,11 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); return; } - logger_->log_info("Site2Site transaction %s send flow record %s", - transactionID.c_str(), flow->getUUIDStr().c_str()); + logger_->log_info("Site2Site transaction %s send flow record %s", transactionID.c_str(), flow->getUUIDStr().c_str()); uint64_t endTime = getTimeMillis(); std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr(); - std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" - + peer_->getHostName(); - session->getProvenanceReporter()->send(flow, transitUri, details, - endTime - startTime, false); + std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName(); + session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false); session->remove(flow); uint64_t transferNanos = getTimeNano() - startSendingNanos; @@ -1243,9 +1137,7 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); return; } - logger_->log_info( - "Site2Site transaction %s successfully send flow record %d, content bytes %d", - transactionID.c_str(), transaction->_transfers, transaction->_bytes); + logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes); } catch (std::exception &exception) { if (transaction) deleteTransaction(transactionID); @@ -1258,8 +1150,7 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, deleteTransaction(transactionID); context->yield(); tearDown(); - logger_->log_debug( - "Caught Exception during Site2SiteClientProtocol::transferFlowFiles"); + logger_->log_debug("Caught Exception during Site2SiteClientProtocol::transferFlowFiles"); throw; } @@ -1268,9 +1159,7 @@ void Site2SiteClientProtocol::transferFlowFiles(core::ProcessContext *context, return; } -void Site2SiteClientProtocol::transferString(core::ProcessContext *context, - core::ProcessSession *session, std::string &payload, - std::map<std::string, std::string> attributes) { +void Site2SiteClientProtocol::transferString(core::ProcessContext *context, core::ProcessSession *session, std::string &payload, std::map<std::string, std::string> attributes) { Transaction *transaction = NULL; if (payload.length() <= 0) @@ -1283,8 +1172,7 @@ void Site2SiteClientProtocol::transferString(core::ProcessContext *context, if (_peerState != READY) { context->yield(); tearDown(); - throw Exception(SITE2SITE_EXCEPTION, - "Can not establish handshake with peer"); + throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); return; } @@ -1306,8 +1194,7 @@ void Site2SiteClientProtocol::transferString(core::ProcessContext *context, throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); return; } - logger_->log_info("Site2Site transaction %s send bytes length %d", - transactionID.c_str(), payload.length()); + logger_->log_info("Site2Site transaction %s send bytes length %d", transactionID.c_str(), payload.length()); if (!confirm(transactionID)) { throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); @@ -1317,9 +1204,7 @@ void Site2SiteClientProtocol::transferString(core::ProcessContext *context, throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); return; } - logger_->log_info( - "Site2Site transaction %s successfully send flow record %d, content bytes %d", - transactionID.c_str(), transaction->_transfers, transaction->_bytes); + logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", transactionID.c_str(), transaction->_transfers, transaction->_bytes); } catch (std::exception &exception) { if (transaction) deleteTransaction(transactionID); @@ -1332,8 +1217,7 @@ void Site2SiteClientProtocol::transferString(core::ProcessContext *context, deleteTransaction(transactionID); context->yield(); tearDown(); - logger_->log_debug( - "Caught Exception during Site2SiteClientProtocol::transferBytes"); + logger_->log_debug("Caught Exception during Site2SiteClientProtocol::transferBytes"); throw; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/Site2SitePeer.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp index 551d466..7c46564 100644 --- a/libminifi/src/Site2SitePeer.cpp +++ b/libminifi/src/Site2SitePeer.cpp @@ -44,9 +44,7 @@ bool Site2SitePeer::Open() { uint16_t data_size = sizeof MAGIC_BYTES; - if (stream_->writeData( - reinterpret_cast<uint8_t *>(const_cast<char*>(MAGIC_BYTES)), data_size) - != data_size) { + if (stream_->writeData(reinterpret_cast<uint8_t *>(const_cast<char*>(MAGIC_BYTES)), data_size) != data_size) { return false; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/ThreadedSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 7e9bb03..46a4710 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -35,103 +35,77 @@ namespace apache { namespace nifi { namespace minifi { -void ThreadedSchedulingAgent::schedule( - std::shared_ptr<core::Processor> processor) { +void ThreadedSchedulingAgent::schedule(std::shared_ptr<core::Processor> processor) { std::lock_guard<std::mutex> lock(mutex_); admin_yield_duration_ = 0; std::string yieldValue; - if (configure_->get(Configure::nifi_administrative_yield_duration, - yieldValue)) { + if (configure_->get(Configure::nifi_administrative_yield_duration, yieldValue)) { core::TimeUnit unit; - if (core::Property::StringToTime(yieldValue, admin_yield_duration_, unit) - && core::Property::ConvertTimeUnitToMS(admin_yield_duration_, unit, - admin_yield_duration_)) { - logger_->log_debug("nifi_administrative_yield_duration: [%d] ms", - admin_yield_duration_); + if (core::Property::StringToTime(yieldValue, admin_yield_duration_, unit) && core::Property::ConvertTimeUnitToMS(admin_yield_duration_, unit, admin_yield_duration_)) { + logger_->log_debug("nifi_administrative_yield_duration: [%d] ms", admin_yield_duration_); } } bored_yield_duration_ = 0; if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) { core::TimeUnit unit; - if (core::Property::StringToTime(yieldValue, bored_yield_duration_, unit) - && core::Property::ConvertTimeUnitToMS(bored_yield_duration_, unit, - bored_yield_duration_)) { - logger_->log_debug("nifi_bored_yield_duration: [%d] ms", - bored_yield_duration_); + if (core::Property::StringToTime(yieldValue, bored_yield_duration_, unit) && core::Property::ConvertTimeUnitToMS(bored_yield_duration_, unit, bored_yield_duration_)) { + logger_->log_debug("nifi_bored_yield_duration: [%d] ms", bored_yield_duration_); } } if (processor->getScheduledState() != core::RUNNING) { - logger_->log_info( - "Can not schedule threads for processor %s because it is not running", - processor->getName().c_str()); + logger_->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str()); return; } - std::map<std::string, std::vector<std::thread *>>::iterator it = - _threads.find(processor->getUUIDStr()); + std::map<std::string, std::vector<std::thread *>>::iterator it = _threads.find(processor->getUUIDStr()); if (it != _threads.end()) { - logger_->log_info( - "Can not schedule threads for processor %s because there are existing threads running"); + logger_->log_info("Can not schedule threads for processor %s because there are existing threads running"); return; } core::ProcessorNode processor_node(processor); - auto processContext = std::make_shared<core::ProcessContext>( - processor_node, controller_service_provider_, repo_); - auto sessionFactory = std::make_shared<core::ProcessSessionFactory>( - processContext.get()); + auto processContext = std::make_shared<core::ProcessContext>(processor_node, controller_service_provider_, repo_); + auto sessionFactory = std::make_shared<core::ProcessSessionFactory>(processContext.get()); processor->onSchedule(processContext.get(), sessionFactory.get()); std::vector<std::thread *> threads; for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) { ThreadedSchedulingAgent *agent = this; - std::thread *thread = new std::thread( - [agent, processor, processContext, sessionFactory] () { - agent->run(processor, processContext.get(), sessionFactory.get()); - }); + std::thread *thread = new std::thread([agent, processor, processContext, sessionFactory] () { + agent->run(processor, processContext.get(), sessionFactory.get()); + }); thread->detach(); threads.push_back(thread); - logger_->log_info("Scheduled thread %d running for process %s", - thread->get_id(), processor->getName().c_str()); + logger_->log_info("Scheduled thread %d running for process %s", thread->get_id(), processor->getName().c_str()); } _threads[processor->getUUIDStr().c_str()] = threads; return; } -void ThreadedSchedulingAgent::unschedule( - std::shared_ptr<core::Processor> processor) { +void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) { std::lock_guard<std::mutex> lock(mutex_); - logger_->log_info("Shutting down threads for processor %s/%s", - processor->getName().c_str(), - processor->getUUIDStr().c_str()); + logger_->log_info("Shutting down threads for processor %s/%s", processor->getName().c_str(), processor->getUUIDStr().c_str()); if (processor->getScheduledState() != core::RUNNING) { - logger_->log_info( - "Cannot unschedule threads for processor %s because it is not running", - processor->getName().c_str()); + logger_->log_info("Cannot unschedule threads for processor %s because it is not running", processor->getName().c_str()); return; } - std::map<std::string, std::vector<std::thread *>>::iterator it = - _threads.find(processor->getUUIDStr()); + std::map<std::string, std::vector<std::thread *>>::iterator it = _threads.find(processor->getUUIDStr()); if (it == _threads.end()) { - logger_->log_info( - "Cannot unschedule threads for processor %s because there are no existing threads running", - processor->getName().c_str()); + logger_->log_info("Cannot unschedule threads for processor %s because there are no existing threads running", processor->getName().c_str()); return; } - for (std::vector<std::thread *>::iterator itThread = it->second.begin(); - itThread != it->second.end(); ++itThread) { + for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread) { std::thread *thread = *itThread; - logger_->log_info("Scheduled thread %d deleted for process %s", - thread->get_id(), processor->getName().c_str()); + logger_->log_info("Scheduled thread %d deleted for process %s", thread->get_id(), processor->getName().c_str()); delete thread; } _threads.erase(processor->getUUIDStr()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/TimerDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 8610e64..b9a41ea 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -29,24 +29,17 @@ namespace apache { namespace nifi { namespace minifi { -void TimerDrivenSchedulingAgent::run( - std::shared_ptr<core::Processor> processor, - core::ProcessContext *processContext, - core::ProcessSessionFactory *sessionFactory) { +void TimerDrivenSchedulingAgent::run(std::shared_ptr<core::Processor> processor, core::ProcessContext *processContext, core::ProcessSessionFactory *sessionFactory) { while (this->running_) { - bool shouldYield = this->onTrigger(processor, processContext, - sessionFactory); + bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); if (processor->isYield()) { // Honor the yield - std::this_thread::sleep_for( - std::chrono::milliseconds(processor->getYieldTime())); + std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime())); } else if (shouldYield && this->bored_yield_duration_ > 0) { // No work to do or need to apply back pressure - std::this_thread::sleep_for( - std::chrono::milliseconds(this->bored_yield_duration_)); + std::this_thread::sleep_for(std::chrono::milliseconds(this->bored_yield_duration_)); } - std::this_thread::sleep_for( - std::chrono::nanoseconds(processor->getSchedulingPeriodNano())); + std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano())); } return; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/controllers/SSLContextService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp index 51a7cc4..a9450f6 100644 --- a/libminifi/src/controllers/SSLContextService.cpp +++ b/libminifi/src/controllers/SSLContextService.cpp @@ -51,10 +51,8 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() { method = TLSv1_2_client_method(); SSL_CTX *ctx = SSL_CTX_new(method); - if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) - <= 0) { - logger_->log_error("Could not create load certificate, error : %s", - std::strerror(errno)); + if (SSL_CTX_use_certificate_file(ctx, certificate.c_str(), SSL_FILETYPE_PEM) <= 0) { + logger_->log_error("Could not create load certificate, error : %s", std::strerror(errno)); return nullptr; } if (!IsNullOrEmpty(passphrase_)) { @@ -62,25 +60,20 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() { SSL_CTX_set_default_passwd_cb(ctx, pemPassWordCb); } - int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), - SSL_FILETYPE_PEM); + int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM); if (retp != 1) { - logger_->log_error("Could not create load private key,%i on %s error : %s", - retp, private_key_, std::strerror(errno)); + logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, std::strerror(errno)); return nullptr; } if (!SSL_CTX_check_private_key(ctx)) { - logger_->log_error( - "Private key does not match the public certificate, error : %s", - std::strerror(errno)); + logger_->log_error("Private key does not match the public certificate, error : %s", std::strerror(errno)); return nullptr; } retp = SSL_CTX_load_verify_locations(ctx, ca_certificate_.c_str(), 0); if (retp == 0) { - logger_->log_error("Can not load CA certificate, Exiting, error : %s", - std::strerror(errno)); + logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno)); } return std::unique_ptr<SSLContext>(new SSLContext(ctx)); } @@ -114,20 +107,16 @@ void SSLContextService::onEnable() { valid_ = true; core::Property property("Client Certificate", "Client Certificate"); core::Property privKey("Private Key", "Private Key file"); - core::Property passphrase_prop( - "Passphrase", "Client passphrase. Either a file or unencrypted text"); + core::Property passphrase_prop("Passphrase", "Client passphrase. Either a file or unencrypted text"); core::Property caCert("CA Certificate", "CA certificate file"); std::string default_dir; if (nullptr != configuration_) - configuration_->get(Configure::nifi_default_directory, default_dir); + configuration_->get(Configure::nifi_default_directory, default_dir); logger_->log_trace("onEnable()"); - if (getProperty(property.getName(), certificate) - && getProperty(privKey.getName(), private_key_)) { - logger_->log_error( - "Certificate and Private Key PEM file not configured, error: %s.", - std::strerror(errno)); + if (getProperty(property.getName(), certificate) && getProperty(privKey.getName(), private_key_)) { + logger_->log_error("Certificate and Private Key PEM file not configured, error: %s.", std::strerror(errno)); std::ifstream cert_file(certificate); std::ifstream priv_file(private_key_); @@ -168,17 +157,14 @@ void SSLContextService::onEnable() { if (passphrase_file.good()) { passphrase_file_ = passphrase_; // we should read it from the file - passphrase_.assign((std::istreambuf_iterator<char>(passphrase_file)), - std::istreambuf_iterator<char>()); + passphrase_.assign((std::istreambuf_iterator<char>(passphrase_file)), std::istreambuf_iterator<char>()); } else { std::string test_passphrase = default_dir + passphrase_; std::ifstream passphrase_file_test(test_passphrase); if (passphrase_file_test.good()) { passphrase_ = test_passphrase; passphrase_file_ = test_passphrase; - passphrase_.assign( - (std::istreambuf_iterator<char>(passphrase_file_test)), - std::istreambuf_iterator<char>()); + passphrase_.assign((std::istreambuf_iterator<char>(passphrase_file_test)), std::istreambuf_iterator<char>()); } else { valid_ = false; } @@ -208,8 +194,7 @@ void SSLContextService::onEnable() { void SSLContextService::initializeTLS() { core::Property property("Client Certificate", "Client Certificate"); core::Property privKey("Private Key", "Private Key file"); - core::Property passphrase_prop( - "Passphrase", "Client passphrase. Either a file or unencrypted text"); + core::Property passphrase_prop("Passphrase", "Client passphrase. Either a file or unencrypted text"); core::Property caCert("CA Certificate", "CA certificate file"); std::set<core::Property> supportedProperties; supportedProperties.insert(property); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ClassLoader.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp index 72ac80c..9bead0e 100644 --- a/libminifi/src/core/ClassLoader.cpp +++ b/libminifi/src/core/ClassLoader.cpp @@ -28,7 +28,9 @@ namespace nifi { namespace minifi { namespace core { -ClassLoader::ClassLoader() : logger_(logging::LoggerFactory<ClassLoader>::getLogger()) {} +ClassLoader::ClassLoader() + : logger_(logging::LoggerFactory<ClassLoader>::getLogger()) { +} ClassLoader &ClassLoader::getDefaultClassLoader() { static ClassLoader ret; @@ -49,8 +51,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource) { dlerror(); // load the symbols - createFactory* create_factory_func = reinterpret_cast<createFactory*>(dlsym( - resource_ptr, "createFactory")); + createFactory* create_factory_func = reinterpret_cast<createFactory*>(dlsym(resource_ptr, "createFactory")); const char* dlsym_error = dlerror(); if (dlsym_error) { logger_->log_error("Cannot load library: %s", dlsym_error); @@ -61,8 +62,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource) { std::lock_guard<std::mutex> lock(internal_mutex_); - loaded_factories_[factory->getClassName()] = std::unique_ptr<ObjectFactory>( - factory); + loaded_factories_[factory->getClassName()] = std::unique_ptr<ObjectFactory>(factory); return RESOURCE_SUCCESS; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ConfigurableComponent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index 94fa599..f5247ac 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -33,8 +33,7 @@ ConfigurableComponent::ConfigurableComponent() : logger_(logging::LoggerFactory<ConfigurableComponent>::getLogger()) { } -ConfigurableComponent::ConfigurableComponent( - const ConfigurableComponent &&other) +ConfigurableComponent::ConfigurableComponent(const ConfigurableComponent &&other) : properties_(std::move(other.properties_)), logger_(logging::LoggerFactory<ConfigurableComponent>::getLogger()) { } @@ -42,8 +41,7 @@ ConfigurableComponent::ConfigurableComponent( ConfigurableComponent::~ConfigurableComponent() { } -bool ConfigurableComponent::getProperty(const std::string &name, - Property &prop) { +bool ConfigurableComponent::getProperty(const std::string &name, Property &prop) { std::lock_guard<std::mutex> lock(configuration_mutex_); auto &&it = properties_.find(name); @@ -62,16 +60,14 @@ bool ConfigurableComponent::getProperty(const std::string &name, * @param value value passed in by reference * @return result of getting property. */ -bool ConfigurableComponent::getProperty(const std::string name, - std::string &value) { +bool ConfigurableComponent::getProperty(const std::string name, std::string &value) { std::lock_guard<std::mutex> lock(configuration_mutex_); auto &&it = properties_.find(name); if (it != properties_.end()) { Property item = it->second; value = item.getValue(); - logger_->log_info("Processor %s property name %s value %s", name, - item.getName(), value); + logger_->log_info("Processor %s property name %s value %s", name, item.getName(), value); return true; } else { return false; @@ -83,8 +79,7 @@ bool ConfigurableComponent::getProperty(const std::string name, * @param value property value. * @return result of setting property. */ -bool ConfigurableComponent::setProperty(const std::string name, - std::string value) { +bool ConfigurableComponent::setProperty(const std::string name, std::string value) { std::lock_guard<std::mutex> lock(configuration_mutex_); auto &&it = properties_.find(name); @@ -92,8 +87,7 @@ bool ConfigurableComponent::setProperty(const std::string name, Property item = it->second; item.setValue(value); properties_[item.getName()] = item; - logger_->log_info("Component %s property name %s value %s", name.c_str(), - item.getName().c_str(), value.c_str()); + logger_->log_info("Component %s property name %s value %s", name.c_str(), item.getName().c_str(), value.c_str()); return true; } else { return false; @@ -106,8 +100,7 @@ bool ConfigurableComponent::setProperty(const std::string name, * @param value property value. * @return result of setting property. */ -bool ConfigurableComponent::updateProperty(const std::string &name, - const std::string &value) { +bool ConfigurableComponent::updateProperty(const std::string &name, const std::string &value) { std::lock_guard<std::mutex> lock(configuration_mutex_); auto &&it = properties_.find(name); @@ -115,8 +108,7 @@ bool ConfigurableComponent::updateProperty(const std::string &name, Property item = it->second; item.addValue(value); properties_[item.getName()] = item; - logger_->log_info("Component %s property name %s value %s", name.c_str(), - item.getName().c_str(), value.c_str()); + logger_->log_info("Component %s property name %s value %s", name.c_str(), item.getName().c_str(), value.c_str()); return true; } else { return false; @@ -137,14 +129,12 @@ bool ConfigurableComponent::setProperty(Property &prop, std::string value) { Property item = it->second; item.setValue(value); properties_[item.getName()] = item; - logger_->log_info("property name %s value %s", prop.getName().c_str(), - item.getName().c_str(), value.c_str()); + logger_->log_info("property name %s value %s", prop.getName().c_str(), item.getName().c_str(), value.c_str()); return true; } else { Property newProp(prop); newProp.setValue(value); - properties_.insert( - std::pair<std::string, Property>(prop.getName(), newProp)); + properties_.insert(std::pair<std::string, Property>(prop.getName(), newProp)); return true; } return false; @@ -155,8 +145,7 @@ bool ConfigurableComponent::setProperty(Property &prop, std::string value) { * @param supported properties * @return result of set operation. */ -bool ConfigurableComponent::setSupportedProperties( - std::set<Property> properties) { +bool ConfigurableComponent::setSupportedProperties(std::set<Property> properties) { if (!canEdit()) { return false; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/ConfigurationFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp index 6aa42e3..ea2ed5c 100644 --- a/libminifi/src/core/ConfigurationFactory.cpp +++ b/libminifi/src/core/ConfigurationFactory.cpp @@ -39,50 +39,34 @@ namespace core { class YamlConfiguration; #endif -std::unique_ptr<core::FlowConfiguration> createFlowConfiguration( - std::shared_ptr<core::Repository> repo, - std::shared_ptr<core::Repository> flow_file_repo, - std::shared_ptr<Configure> configure, - std::shared_ptr<io::StreamFactory> stream_factory, - const std::string configuration_class_name, const std::string path, - bool fail_safe) { - +std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, + std::shared_ptr<io::StreamFactory> stream_factory, const std::string configuration_class_name, const std::string path, + bool fail_safe) { std::string class_name_lc = configuration_class_name; - std::transform(class_name_lc.begin(), class_name_lc.end(), - class_name_lc.begin(), ::tolower); + std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower); try { if (class_name_lc == "flowconfiguration") { // load the base configuration. - return std::unique_ptr<core::FlowConfiguration>( - new core::FlowConfiguration(repo, flow_file_repo, stream_factory, - configure, path)); + return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path)); } else if (class_name_lc == "yamlconfiguration") { // only load if the class is defined. - return std::unique_ptr<core::FlowConfiguration>( - instantiate<core::YamlConfiguration>(repo, flow_file_repo, - stream_factory, configure, path)); + return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, stream_factory, configure, path)); } else { if (fail_safe) { - return std::unique_ptr<core::FlowConfiguration>( - new core::FlowConfiguration(repo, flow_file_repo, stream_factory, - configure, path)); + return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path)); } else { - throw std::runtime_error( - "Support for the provided configuration class could not be found"); + throw std::runtime_error("Support for the provided configuration class could not be found"); } } } catch (const std::runtime_error &r) { if (fail_safe) { - return std::unique_ptr<core::FlowConfiguration>( - new core::FlowConfiguration(repo, flow_file_repo, stream_factory, - configure, path)); + return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, stream_factory, configure, path)); } } - throw std::runtime_error( - "Support for the provided configuration class could not be found"); + throw std::runtime_error("Support for the provided configuration class could not be found"); } } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/Connectable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp index 5bd32e9..cf01f0c 100644 --- a/libminifi/src/core/Connectable.cpp +++ b/libminifi/src/core/Connectable.cpp @@ -47,12 +47,9 @@ Connectable::Connectable(const Connectable &&other) Connectable::~Connectable() { } -bool Connectable::setSupportedRelationships( - std::set<core::Relationship> relationships) { +bool Connectable::setSupportedRelationships(std::set<core::Relationship> relationships) { if (isRunning()) { - logger_->log_info( - "Can not set processor supported relationship while the process %s is running", - name_.c_str()); + logger_->log_info("Can not set processor supported relationship while the process %s is running", name_.c_str()); return false; } @@ -61,8 +58,7 @@ bool Connectable::setSupportedRelationships( relationships_.clear(); for (auto item : relationships) { relationships_[item.getName()] = item; - logger_->log_info("Processor %s supported relationship name %s", - name_.c_str(), item.getName().c_str()); + logger_->log_info("Processor %s supported relationship name %s", name_.c_str(), item.getName().c_str()); } return true; } @@ -71,10 +67,7 @@ bool Connectable::setSupportedRelationships( bool Connectable::isSupportedRelationship(core::Relationship relationship) { const bool requiresLock = isRunning(); - const auto conditionalLock = - !requiresLock ? - std::unique_lock<std::mutex>() : - std::unique_lock<std::mutex>(relationship_mutex_); + const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_); const auto &it = relationships_.find(relationship.getName()); if (it != relationships_.end()) { @@ -84,12 +77,9 @@ bool Connectable::isSupportedRelationship(core::Relationship relationship) { } } -bool Connectable::setAutoTerminatedRelationships( - std::set<Relationship> relationships) { +bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relationships) { if (isRunning()) { - logger_->log_info( - "Can not set processor auto terminated relationship while the process %s is running", - name_.c_str()); + logger_->log_info("Can not set processor auto terminated relationship while the process %s is running", name_.c_str()); return false; } @@ -98,8 +88,7 @@ bool Connectable::setAutoTerminatedRelationships( auto_terminated_relationships_.clear(); for (auto item : relationships) { auto_terminated_relationships_[item.getName()] = item; - logger_->log_info("Processor %s auto terminated relationship name %s", - name_.c_str(), item.getName().c_str()); + logger_->log_info("Processor %s auto terminated relationship name %s", name_.c_str(), item.getName().c_str()); } return true; } @@ -108,10 +97,7 @@ bool Connectable::setAutoTerminatedRelationships( bool Connectable::isAutoTerminated(core::Relationship relationship) { const bool requiresLock = isRunning(); - const auto conditionalLock = - !requiresLock ? - std::unique_lock<std::mutex>() : - std::unique_lock<std::mutex>(relationship_mutex_); + const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_); const auto &it = auto_terminated_relationships_.find(relationship.getName()); if (it != auto_terminated_relationships_.end()) { @@ -126,8 +112,7 @@ void Connectable::waitForWork(uint64_t timeoutMs) { if (!has_work_.load()) { std::unique_lock<std::mutex> lock(work_available_mutex_); - work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs), - [&] {return has_work_.load();}); + work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] {return has_work_.load();}); } } @@ -146,8 +131,7 @@ void Connectable::notifyWork() { } } -std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections( - std::string relationship) { +std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(std::string relationship) { std::set<std::shared_ptr<Connectable>> empty; auto &&it = out_going_connections_.find(relationship); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 1ed9176..6635701 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -30,14 +30,12 @@ namespace core { FlowConfiguration::~FlowConfiguration() { } -std::shared_ptr<core::Processor> FlowConfiguration::createProcessor( - std::string name, uuid_t uuid) { +std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, uuid_t uuid) { auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, uuid); if (nullptr == ptr) { logger_->log_error("No Processor defined for %s", name.c_str()); } - std::shared_ptr<core::Processor> processor = std::static_pointer_cast< - core::Processor>(ptr); + std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr); // initialize the processor processor->initialize(); @@ -48,37 +46,27 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor( std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask() { std::shared_ptr<core::Processor> processor = nullptr; - processor = - std::make_shared< - org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>( - stream_factory_); + processor = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(stream_factory_); // initialize the processor processor->initialize(); return processor; } -std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup( - std::string name, uuid_t uuid) { - return std::unique_ptr<core::ProcessGroup>( - new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid)); +std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid) { + return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid)); } -std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup( - std::string name, uuid_t uuid) { - return std::unique_ptr<core::ProcessGroup>( - new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid)); +std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) { + return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid)); } -std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection( - std::string name, uuid_t uuid) { +std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, uuid_t uuid) { return std::make_shared<minifi::Connection>(flow_file_repo_, name, uuid); } -std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService( - const std::string &class_name, const std::string &name, uuid_t uuid) { - std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = - service_provider_->createControllerService(class_name, name, true); +std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid) { + std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, name, true); if (nullptr != controllerServicesNode) controllerServicesNode->setUUID(uuid); return controllerServicesNode; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/77a20dbe/libminifi/src/core/FlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp index 8c693bf..af73b91 100644 --- a/libminifi/src/core/FlowFile.cpp +++ b/libminifi/src/core/FlowFile.cpp @@ -178,8 +178,7 @@ void FlowFile::setLineageStartDate(const uint64_t date) { * Sets the original connection with a shared pointer. * @param connection shared connection. */ -void FlowFile::setOriginalConnection( - std::shared_ptr<core::Connectable> &connection) { +void FlowFile::setOriginalConnection(std::shared_ptr<core::Connectable> &connection) { original_connection_ = connection; }
