http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Site2SiteClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SiteClientProtocol.cpp b/libminifi/src/Site2SiteClientProtocol.cpp index bd4de97..e0265bb 100644 --- a/libminifi/src/Site2SiteClientProtocol.cpp +++ b/libminifi/src/Site2SiteClientProtocol.cpp @@ -29,1308 +29,1219 @@ #include "Site2SitePeer.h" #include "Site2SiteClientProtocol.h" -bool Site2SiteClientProtocol::establish() -{ - if (_peerState != IDLE) - { - logger_->log_error("Site2Site peer state is not idle while try to establish"); - return false; - } - - bool ret = peer_->Open(); - - if (!ret) - { - logger_->log_error("Site2Site peer socket open failed"); - return false; - } - - // Negotiate the version - ret = initiateResourceNegotiation(); - - if (!ret) - { - logger_->log_error("Site2Site Protocol Version Negotiation failed"); - /* - peer_->yield(); - tearDown(); */ - return false; - } - - logger_->log_info("Site2Site socket established"); - _peerState = ESTABLISHED; - - return true; +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +bool Site2SiteClientProtocol::establish() { + if (_peerState != IDLE) { + logger_->log_error( + "Site2Site peer state is not idle while try to establish"); + return false; + } + + bool ret = peer_->Open(); + + if (!ret) { + logger_->log_error("Site2Site peer socket open failed"); + return false; + } + + // Negotiate the version + ret = initiateResourceNegotiation(); + + if (!ret) { + logger_->log_error("Site2Site Protocol Version Negotiation failed"); + /* + peer_->yield(); + tearDown(); */ + return false; + } + + logger_->log_info("Site2Site socket established"); + _peerState = ESTABLISHED; + + return true; } -bool Site2SiteClientProtocol::initiateResourceNegotiation() -{ - // Negotiate the version - if (_peerState != IDLE) - { - logger_->log_error("Site2Site peer state is not idle while initiateResourceNegotiation"); - return false; - } - - logger_->log_info("Negotiate protocol version with destination port %s current version %d", _portIdStr.c_str(), _currentVersion); - - int ret = peer_->writeUTF(this->getResourceName()); - - logger_->log_info("result of writing resource name is %i",ret); - if (ret <= 0) - { - logger_->log_debug("result of writing resource name is %i",ret); - // tearDown(); - return false; - } - - ret = peer_->write(_currentVersion); - - if (ret <= 0) - { - logger_->log_info("result of writing version is %i",ret); - // tearDown(); - return false; - } - - uint8_t statusCode; - ret = peer_->read(statusCode); - - if (ret <= 0) - { - logger_->log_info("result of writing version status code %i",ret); - // tearDown(); - return false; - } - logger_->log_info("status code is %i",statusCode); - switch (statusCode) - { - case RESOURCE_OK: - logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) - { - // tearDown(); - return false; - } - logger_->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); - for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion)/sizeof(uint32_t); i++) - { - if (serverVersion >= _supportedVersion[i]) - { - _currentVersion = _supportedVersion[i]; - _currentVersionIndex = i; - return initiateResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Negotiate protocol response ABORT"); - ret = -1; - // tearDown(); - return false; - default: - logger_->log_info("Negotiate protocol response unknown code %d", statusCode); - return true; - } - - return true; +bool Site2SiteClientProtocol::initiateResourceNegotiation() { + // Negotiate the version + if (_peerState != IDLE) { + logger_->log_error( + "Site2Site peer state is not idle while initiateResourceNegotiation"); + return false; + } + + logger_->log_info( + "Negotiate protocol version with destination port %s current version %d", + _portIdStr.c_str(), _currentVersion); + + int ret = peer_->writeUTF(this->getResourceName()); + + logger_->log_info("result of writing resource name is %i", ret); + if (ret <= 0) { + logger_->log_debug("result of writing resource name is %i", ret); + // tearDown(); + return false; + } + + ret = peer_->write(_currentVersion); + + if (ret <= 0) { + logger_->log_info("result of writing version is %i", ret); + // tearDown(); + return false; + } + + uint8_t statusCode; + ret = peer_->read(statusCode); + + if (ret <= 0) { + logger_->log_info("result of writing version status code %i", ret); + // tearDown(); + return false; + } + logger_->log_info("status code is %i", statusCode); + switch (statusCode) { + case RESOURCE_OK: + logger_->log_info("Site2Site Protocol Negotiate protocol version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { + // tearDown(); + return false; + } + logger_->log_info( + "Site2Site Server Response asked for a different protocol version %d", + serverVersion); + for (unsigned int i = (_currentVersionIndex + 1); + i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedVersion[i]) { + _currentVersion = _supportedVersion[i]; + _currentVersionIndex = i; + return initiateResourceNegotiation(); + } + } + ret = -1; + // tearDown(); + return false; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Negotiate protocol response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + logger_->log_info("Negotiate protocol response unknown code %d", + statusCode); + return true; + } + + return true; } -bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() -{ - // Negotiate the version - if (_peerState != HANDSHAKED) - { - logger_->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation"); - return false; - } - - logger_->log_info("Negotiate Codec version with destination port %s current version %d", _portIdStr.c_str(), _currentCodecVersion); - - int ret = peer_->writeUTF(this->getCodecResourceName()); - - if (ret <= 0) - { - logger_->log_debug("result of getCodecResourceName is %i",ret); - // tearDown(); - return false; - } - - ret = peer_->write(_currentCodecVersion); - - if (ret <= 0) - { - logger_->log_debug("result of _currentCodecVersion is %i",ret); - // tearDown(); - return false; - } - - uint8_t statusCode; - ret = peer_->read(statusCode); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - switch (statusCode) - { - case RESOURCE_OK: - logger_->log_info("Site2Site Codec Negotiate version OK"); - return true; - case DIFFERENT_RESOURCE_VERSION: - uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) - { - // tearDown(); - return false; - } - logger_->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); - for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion)/sizeof(uint32_t); i++) - { - if (serverVersion >= _supportedCodecVersion[i]) - { - _currentCodecVersion = _supportedCodecVersion[i]; - _currentCodecVersionIndex = i; - return initiateCodecResourceNegotiation(); - } - } - ret = -1; - // tearDown(); - return false; - case NEGOTIATED_ABORT: - logger_->log_info("Site2Site Codec Negotiate response ABORT"); - ret = -1; - // tearDown(); - return false; - default: - logger_->log_info("Negotiate Codec response unknown code %d", statusCode); - return true; - } - - return true; +bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() { + // Negotiate the version + if (_peerState != HANDSHAKED) { + logger_->log_error( + "Site2Site peer state is not handshaked while initiateCodecResourceNegotiation"); + return false; + } + + logger_->log_info( + "Negotiate Codec version with destination port %s current version %d", + _portIdStr.c_str(), _currentCodecVersion); + + int ret = peer_->writeUTF(this->getCodecResourceName()); + + if (ret <= 0) { + logger_->log_debug("result of getCodecResourceName is %i", ret); + // tearDown(); + return false; + } + + ret = peer_->write(_currentCodecVersion); + + if (ret <= 0) { + logger_->log_debug("result of _currentCodecVersion is %i", ret); + // tearDown(); + return false; + } + + uint8_t statusCode; + ret = peer_->read(statusCode); + + if (ret <= 0) { + // tearDown(); + return false; + } + + switch (statusCode) { + case RESOURCE_OK: + logger_->log_info("Site2Site Codec Negotiate version OK"); + return true; + case DIFFERENT_RESOURCE_VERSION: + uint32_t serverVersion; + ret = peer_->read(serverVersion); + if (ret <= 0) { + // tearDown(); + return false; + } + logger_->log_info( + "Site2Site Server Response asked for a different codec version %d", + serverVersion); + for (unsigned int i = (_currentCodecVersionIndex + 1); + i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) { + if (serverVersion >= _supportedCodecVersion[i]) { + _currentCodecVersion = _supportedCodecVersion[i]; + _currentCodecVersionIndex = i; + return initiateCodecResourceNegotiation(); + } + } + ret = -1; + // tearDown(); + return false; + case NEGOTIATED_ABORT: + logger_->log_info("Site2Site Codec Negotiate response ABORT"); + ret = -1; + // tearDown(); + return false; + default: + logger_->log_info("Negotiate Codec response unknown code %d", statusCode); + return true; + } + + return true; } -bool Site2SiteClientProtocol::handShake() -{ - if (_peerState != ESTABLISHED) - { - logger_->log_error("Site2Site peer state is not established while handshake"); - return false; - } - logger_->log_info("Site2Site Protocol Perform hand shake with destination port %s", _portIdStr.c_str()); - uuid_t uuid; - // Generate the global UUID for the com identify - uuid_generate(uuid); - char uuidStr[37]; - uuid_unparse_lower(uuid, uuidStr); - _commsIdentifier = uuidStr; - - int ret = peer_->writeUTF(_commsIdentifier); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - std::map<std::string, std::string> properties; - properties[HandShakePropertyStr[GZIP]] = "false"; - properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr; - properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut); - if (this->_currentVersion >= 5) - { - if (this->_batchCount > 0) - properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(this->_batchCount); - if (this->_batchSize > 0) - properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(this->_batchSize); - if (this->_batchDuration > 0) - properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(this->_batchDuration); - } - - if (_currentVersion >= 3) - { - ret = peer_->writeUTF(peer_->getURL()); - if (ret <= 0) - { - // tearDown(); - return false; - } - } - - uint32_t size = properties.size(); - ret = peer_->write(size); - if (ret <= 0) - { - // tearDown(); - return false; - } - - std::map<std::string, std::string>::iterator it; - for (it = properties.begin(); it!= properties.end(); it++) - { - ret = peer_->writeUTF(it->first); - if (ret <= 0) - { - // tearDown(); - return false; - } - ret = peer_->writeUTF(it->second); - if (ret <= 0) - { - // tearDown(); - return false; - } - logger_->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str()); - } - - RespondCode code; - std::string message; - - ret = this->readRespond(code, message); - - if (ret <= 0) - { - // tearDown(); - return false; - } - - switch (code) - { - case PROPERTIES_OK: - logger_->log_info("Site2Site HandShake Completed"); - _peerState = HANDSHAKED; - return true; - case PORT_NOT_IN_VALID_STATE: +bool Site2SiteClientProtocol::handShake() { + if (_peerState != ESTABLISHED) { + logger_->log_error( + "Site2Site peer state is not established while handshake"); + return false; + } + logger_->log_info( + "Site2Site Protocol Perform hand shake with destination port %s", + _portIdStr.c_str()); + uuid_t uuid; + // Generate the global UUID for the com identify + uuid_generate(uuid); + char uuidStr[37]; + uuid_unparse_lower(uuid, uuidStr); + _commsIdentifier = uuidStr; + + int ret = peer_->writeUTF(_commsIdentifier); + + if (ret <= 0) { + // tearDown(); + return false; + } + + std::map<std::string, std::string> properties; + properties[HandShakePropertyStr[GZIP]] = "false"; + properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr; + properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string( + this->_timeOut); + if (this->_currentVersion >= 5) { + if (this->_batchCount > 0) + properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string( + this->_batchCount); + if (this->_batchSize > 0) + properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string( + this->_batchSize); + if (this->_batchDuration > 0) + properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string( + this->_batchDuration); + } + + if (_currentVersion >= 3) { + ret = peer_->writeUTF(peer_->getURL()); + if (ret <= 0) { + // tearDown(); + return false; + } + } + + uint32_t size = properties.size(); + ret = peer_->write(size); + if (ret <= 0) { + // tearDown(); + return false; + } + + std::map<std::string, std::string>::iterator it; + for (it = properties.begin(); it != properties.end(); it++) { + ret = peer_->writeUTF(it->first); + if (ret <= 0) { + // tearDown(); + return false; + } + ret = peer_->writeUTF(it->second); + if (ret <= 0) { + // tearDown(); + return false; + } + logger_->log_info("Site2Site Protocol Send handshake properties %s %s", + it->first.c_str(), it->second.c_str()); + } + + RespondCode code; + std::string message; + + ret = this->readRespond(code, message); + + if (ret <= 0) { + // tearDown(); + return false; + } + + switch (code) { + case PROPERTIES_OK: + logger_->log_info("Site2Site HandShake Completed"); + _peerState = HANDSHAKED; + return true; + case PORT_NOT_IN_VALID_STATE: case UNKNOWN_PORT: case PORTS_DESTINATION_FULL: - logger_->log_error("Site2Site HandShake Failed because destination port is either invalid or full"); - ret = -1; - /* - peer_->yield(); - tearDown(); */ - return false; - default: - logger_->log_info("HandShake Failed because of unknown respond code %d", code); - ret = -1; - /* - peer_->yield(); - tearDown(); */ - return false; - } - - return false; + logger_->log_error( + "Site2Site HandShake Failed because destination port is either invalid or full"); + ret = -1; + /* + peer_->yield(); + tearDown(); */ + return false; + default: + logger_->log_info("HandShake Failed because of unknown respond code %d", + code); + ret = -1; + /* + peer_->yield(); + tearDown(); */ + return false; + } + + return false; } -void Site2SiteClientProtocol::tearDown() -{ - if (_peerState >= ESTABLISHED) - { - logger_->log_info("Site2Site Protocol tearDown"); - // need to write shutdown request - writeRequestType(SHUTDOWN); - } - - std::map<std::string, Transaction *>::iterator it; - for (it = _transactionMap.begin(); it!= _transactionMap.end(); it++) - { - delete it->second; - } - _transactionMap.clear(); - peer_->Close(); - _peerState = IDLE; +void Site2SiteClientProtocol::tearDown() { + if (_peerState >= ESTABLISHED) { + logger_->log_info("Site2Site Protocol tearDown"); + // need to write shutdown request + writeRequestType(SHUTDOWN); + } + + std::map<std::string, Transaction *>::iterator it; + for (it = _transactionMap.begin(); it != _transactionMap.end(); it++) { + delete it->second; + } + _transactionMap.clear(); + peer_->Close(); + _peerState = IDLE; } -int Site2SiteClientProtocol::writeRequestType(RequestType type) -{ - if (type >= MAX_REQUEST_TYPE) - return -1; +int Site2SiteClientProtocol::writeRequestType(RequestType type) { + if (type >= MAX_REQUEST_TYPE) + return -1; - return peer_->writeUTF(RequestTypeStr[type]); + return peer_->writeUTF(RequestTypeStr[type]); } -int Site2SiteClientProtocol::readRequestType(RequestType &type) -{ - std::string requestTypeStr; +int Site2SiteClientProtocol::readRequestType(RequestType &type) { + std::string requestTypeStr; - int ret = peer_->readUTF(requestTypeStr); + int ret = peer_->readUTF(requestTypeStr); - if (ret <= 0) - return ret; + if (ret <= 0) + return ret; - for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++) - { - if (RequestTypeStr[i] == requestTypeStr) - { - type = (RequestType) i; - return ret; - } - } + for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++) { + if (RequestTypeStr[i] == requestTypeStr) { + type = (RequestType) i; + return ret; + } + } - return -1; + return -1; } -int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string &message) -{ - uint8_t firstByte; +int Site2SiteClientProtocol::readRespond(RespondCode &code, + std::string &message) { + uint8_t firstByte; - int ret = peer_->read(firstByte); + int ret = peer_->read(firstByte); - if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1) - return -1; + if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1) + return -1; - uint8_t secondByte; + uint8_t secondByte; - ret = peer_->read(secondByte); + ret = peer_->read(secondByte); - if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2) - return -1; + if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2) + return -1; - uint8_t thirdByte; + uint8_t thirdByte; - ret = peer_->read(thirdByte); + ret = peer_->read(thirdByte); - if (ret <= 0) - return ret; + if (ret <= 0) + return ret; - code = (RespondCode) thirdByte; + code = (RespondCode) thirdByte; - RespondCodeContext *resCode = this->getRespondCodeContext(code); + RespondCodeContext *resCode = this->getRespondCodeContext(code); - if ( resCode == NULL) - { - // Not a valid respond code - return -1; - } - if (resCode->hasDescription) - { - ret = peer_->readUTF(message); - if (ret <= 0) - return -1; - } - return 3 + message.size(); + if (resCode == NULL) { + // Not a valid respond code + return -1; + } + if (resCode->hasDescription) { + ret = peer_->readUTF(message); + if (ret <= 0) + return -1; + } + return 3 + message.size(); } -int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string message) -{ - RespondCodeContext *resCode = this->getRespondCodeContext(code); - - if (resCode == NULL) - { - // Not a valid respond code - return -1; - } - - uint8_t codeSeq[3]; - codeSeq[0] = CODE_SEQUENCE_VALUE_1; - codeSeq[1] = CODE_SEQUENCE_VALUE_2; - codeSeq[2] = (uint8_t) code; - - int ret = peer_->write(codeSeq, 3); - - if (ret != 3) - return -1; - - if (resCode->hasDescription) - { - ret = peer_->writeUTF(message); - if (ret > 0) - return (3 + ret); - else - return ret; - } - else - return 3; +int Site2SiteClientProtocol::writeRespond(RespondCode code, + std::string message) { + RespondCodeContext *resCode = this->getRespondCodeContext(code); + + if (resCode == NULL) { + // Not a valid respond code + return -1; + } + + uint8_t codeSeq[3]; + codeSeq[0] = CODE_SEQUENCE_VALUE_1; + codeSeq[1] = CODE_SEQUENCE_VALUE_2; + codeSeq[2] = (uint8_t) code; + + int ret = peer_->write(codeSeq, 3); + + if (ret != 3) + return -1; + + if (resCode->hasDescription) { + ret = peer_->writeUTF(message); + if (ret > 0) + return (3 + ret); + else + return ret; + } else + return 3; } -bool Site2SiteClientProtocol::negotiateCodec() -{ - if (_peerState != HANDSHAKED) - { - logger_->log_error("Site2Site peer state is not handshaked while negotiate codec"); - return false; - } +bool Site2SiteClientProtocol::negotiateCodec() { + if (_peerState != HANDSHAKED) { + logger_->log_error( + "Site2Site peer state is not handshaked while negotiate codec"); + return false; + } - logger_->log_info("Site2Site Protocol Negotiate Codec with destination port %s", _portIdStr.c_str()); + logger_->log_info( + "Site2Site Protocol Negotiate Codec with destination port %s", + _portIdStr.c_str()); - int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC); + int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC); - if (status <= 0) - { - // tearDown(); - return false; - } + if (status <= 0) { + // tearDown(); + return false; + } - // Negotiate the codec version - bool ret = initiateCodecResourceNegotiation(); + // Negotiate the codec version + bool ret = initiateCodecResourceNegotiation(); - if (!ret) - { - logger_->log_error("Site2Site Codec Version Negotiation failed"); - /* - peer_->yield(); - tearDown(); */ - return false; - } + if (!ret) { + logger_->log_error("Site2Site Codec Version Negotiation failed"); + /* + peer_->yield(); + tearDown(); */ + return false; + } - logger_->log_info("Site2Site Codec Completed and move to READY state for data transfer"); - _peerState = READY; + logger_->log_info( + "Site2Site Codec Completed and move to READY state for data transfer"); + _peerState = READY; - return true; + return true; } -bool Site2SiteClientProtocol::bootstrap() -{ - if (_peerState == READY) - return true; - - tearDown(); - - if (establish() && handShake() && negotiateCodec()) - { - logger_->log_info("Site2Site Ready For data transaction"); - return true; - } - else - { - peer_->yield(); - tearDown(); - return false; - } +bool Site2SiteClientProtocol::bootstrap() { + if (_peerState == READY) + return true; + + tearDown(); + + if (establish() && handShake() && negotiateCodec()) { + logger_->log_info("Site2Site Ready For data transaction"); + return true; + } else { + peer_->yield(); + tearDown(); + return false; + } } -Transaction* Site2SiteClientProtocol::createTransaction(std::string &transactionID, TransferDirection direction) -{ - int ret; - bool dataAvailable; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - return NULL; - } - - if (direction == RECEIVE) - { - ret = writeRequestType(RECEIVE_FLOWFILES); - - if (ret <= 0) - { - // tearDown(); - return NULL; - } - - RespondCode code; - std::string message; - - ret = readRespond(code, message); - - if (ret <= 0) - { - // tearDown(); - return NULL; - } - - CRCStream<Site2SitePeer> crcstream(peer_); - switch (code) - { - case MORE_DATA: - dataAvailable = true; - logger_->log_info("Site2Site peer indicates that data is available"); - transaction = new Transaction(direction,crcstream); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - transaction->setDataAvailable(dataAvailable); - logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); - return transaction; - case NO_MORE_DATA: - dataAvailable = false; - logger_->log_info("Site2Site peer indicates that no data is available"); - transaction = new Transaction(direction,crcstream); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - transaction->setDataAvailable(dataAvailable); - logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); - return transaction; - default: - logger_->log_info("Site2Site got unexpected response %d when asking for data", code); - // tearDown(); - return NULL; - } - } - else - { - ret = writeRequestType(SEND_FLOWFILES); - - if (ret <= 0) - { - // tearDown(); - return NULL; - } - else - { - CRCStream<Site2SitePeer> crcstream(peer_); - transaction = new Transaction(direction,crcstream); - _transactionMap[transaction->getUUIDStr()] = transaction; - transactionID = transaction->getUUIDStr(); - logger_->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); - return transaction; - } - } +Transaction* Site2SiteClientProtocol::createTransaction( + std::string &transactionID, TransferDirection direction) { + int ret; + bool dataAvailable; + Transaction *transaction = NULL; + + if (_peerState != READY) { + bootstrap(); + } + + if (_peerState != READY) { + return NULL; + } + + if (direction == RECEIVE) { + ret = writeRequestType(RECEIVE_FLOWFILES); + + if (ret <= 0) { + // tearDown(); + return NULL; + } + + RespondCode code; + std::string message; + + ret = readRespond(code, message); + + if (ret <= 0) { + // tearDown(); + return NULL; + } + + org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get()); + switch (code) { + case MORE_DATA: + dataAvailable = true; + logger_->log_info("Site2Site peer indicates that data is available"); + transaction = new Transaction(direction, crcstream); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + logger_->log_info("Site2Site create transaction %s", + transaction->getUUIDStr().c_str()); + return transaction; + case NO_MORE_DATA: + dataAvailable = false; + logger_->log_info("Site2Site peer indicates that no data is available"); + transaction = new Transaction(direction, crcstream); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + transaction->setDataAvailable(dataAvailable); + logger_->log_info("Site2Site create transaction %s", + transaction->getUUIDStr().c_str()); + return transaction; + default: + logger_->log_info( + "Site2Site got unexpected response %d when asking for data", code); + // tearDown(); + return NULL; + } + } else { + ret = writeRequestType(SEND_FLOWFILES); + + if (ret <= 0) { + // tearDown(); + return NULL; + } else { + org::apache::nifi::minifi::io::CRCStream<Site2SitePeer> crcstream(peer_.get()); + transaction = new Transaction(direction, crcstream); + _transactionMap[transaction->getUUIDStr()] = transaction; + transactionID = transaction->getUUIDStr(); + logger_->log_info("Site2Site create transaction %s", + transaction->getUUIDStr().c_str()); + return transaction; + } + } } -bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket *packet, bool &eof) -{ - int ret; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - return false; - } - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return false; - } - else - { - transaction = it->second; - } - - if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) - { - logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); - return false; - } - - if (transaction->getDirection() != RECEIVE) - { - logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); - return false; - } - - if (!transaction->isDataAvailable()) - { - eof = true; - return true; - } - - if (transaction->_transfers > 0) - { - // if we already has transfer before, check to see whether another one is available - RespondCode code; - std::string message; - - ret = readRespond(code, message); - - if (ret <= 0) - { - return false; - } - if (code == CONTINUE_TRANSACTION) - { - logger_->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str()); - transaction->_dataAvailable = true; - } - else if (code == FINISH_TRANSACTION) - { - logger_->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str()); - transaction->_dataAvailable = false; - } - else - { - logger_->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code); - return false; - } - } - - if (!transaction->isDataAvailable()) - { - eof = true; - return true; - } - - // start to read the packet - uint32_t numAttributes; - ret = transaction->getStream().read(numAttributes); - if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) - { - return false; - } - - // read the attributes - for (unsigned int i = 0; i < numAttributes; i++) - { - std::string key; - std::string value; - ret = transaction->getStream().readUTF(key,true); - if (ret <= 0) - { - return false; - } - ret = transaction->getStream().readUTF(value,true); - if (ret <= 0) - { - return false; - } - packet->_attributes[key] = value; - logger_->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str()); - } - - uint64_t len; - ret = transaction->getStream().read(len); - if (ret <= 0) - { - return false; - } - - packet->_size = len; - transaction->_transfers++; - transaction->_state = DATA_EXCHANGED; - transaction->_bytes += len; - logger_->log_info("Site2Site transaction %s receives flow record %d, total length %d", transactionID.c_str(), - transaction->_transfers, transaction->_bytes); - - return true; +bool Site2SiteClientProtocol::receive(std::string transactionID, + DataPacket *packet, bool &eof) { + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) { + bootstrap(); + } + + if (_peerState != READY) { + return false; + } + + std::map<std::string, Transaction *>::iterator it = + this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) { + return false; + } else { + transaction = it->second; + } + + if (transaction->getState() != TRANSACTION_STARTED + && transaction->getState() != DATA_EXCHANGED) { + logger_->log_info( + "Site2Site transaction %s is not at started or exchanged state", + transactionID.c_str()); + return false; + } + + if (transaction->getDirection() != RECEIVE) { + logger_->log_info("Site2Site transaction %s direction is wrong", + transactionID.c_str()); + return false; + } + + if (!transaction->isDataAvailable()) { + eof = true; + return true; + } + + if (transaction->_transfers > 0) { + // if we already has transfer before, check to see whether another one is available + RespondCode code; + std::string message; + + ret = readRespond(code, message); + + if (ret <= 0) { + return false; + } + if (code == CONTINUE_TRANSACTION) { + logger_->log_info( + "Site2Site transaction %s peer indicate continue transaction", + transactionID.c_str()); + transaction->_dataAvailable = true; + } else if (code == FINISH_TRANSACTION) { + logger_->log_info( + "Site2Site transaction %s peer indicate finish transaction", + transactionID.c_str()); + transaction->_dataAvailable = false; + } else { + logger_->log_info( + "Site2Site transaction %s peer indicate wrong respond code %d", + transactionID.c_str(), code); + return false; + } + } + + if (!transaction->isDataAvailable()) { + eof = true; + return true; + } + + // start to read the packet + uint32_t numAttributes; + ret = transaction->getStream().read(numAttributes); + if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) { + return false; + } + + // read the attributes + for (unsigned int i = 0; i < numAttributes; i++) { + std::string key; + std::string value; + ret = transaction->getStream().readUTF(key, true); + if (ret <= 0) { + return false; + } + ret = transaction->getStream().readUTF(value, true); + if (ret <= 0) { + return false; + } + packet->_attributes[key] = value; + logger_->log_info( + "Site2Site transaction %s receives attribute key %s value %s", + transactionID.c_str(), key.c_str(), value.c_str()); + } + + uint64_t len; + ret = transaction->getStream().read(len); + if (ret <= 0) { + return false; + } + + packet->_size = len; + transaction->_transfers++; + transaction->_state = DATA_EXCHANGED; + transaction->_bytes += len; + logger_->log_info( + "Site2Site transaction %s receives flow record %d, total length %d", + transactionID.c_str(), transaction->_transfers, transaction->_bytes); + + return true; } -bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, FlowFileRecord *flowFile, ProcessSession *session) -{ - int ret; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - return false; - } - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return false; - } - else - { - transaction = it->second; - } - - if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) - { - logger_->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); - return false; - } - - if (transaction->getDirection() != SEND) - { - logger_->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); - return false; - } - - if (transaction->_transfers > 0) - { - ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"); - if (ret <= 0) - { - return false; - } - } - - // start to read the packet - uint32_t numAttributes = packet->_attributes.size(); - ret = transaction->getStream().write(numAttributes); - if (ret != 4) - { - return false; - } - - std::map<std::string, std::string>::iterator itAttribute; - for (itAttribute = packet->_attributes.begin(); itAttribute!= packet->_attributes.end(); itAttribute++) - { - ret = transaction->getStream().writeUTF(itAttribute->first, true); - - if (ret <= 0) - { - return false; - } - ret = transaction->getStream().writeUTF(itAttribute->second, true); - if (ret <= 0) - { - return false; - } - logger_->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), - itAttribute->first.c_str(), itAttribute->second.c_str()); - } - - uint64_t len = flowFile->getSize() ; - ret = transaction->getStream().write(len); - if (ret != 8) - { - return false; - } - - if (flowFile->getSize()) - { - Site2SiteClientProtocol::ReadCallback callback(packet); - session->read(flowFile, &callback); - if (flowFile->getSize() != packet->_size) - { - return false; - } - } - - transaction->_transfers++; - transaction->_state = DATA_EXCHANGED; - transaction->_bytes += len; - logger_->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), - transaction->_transfers, transaction->_bytes); - - return true; +bool Site2SiteClientProtocol::send( + std::string transactionID, DataPacket *packet, std::shared_ptr<FlowFileRecord> flowFile, + core::ProcessSession *session) { + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) { + bootstrap(); + } + + if (_peerState != READY) { + return false; + } + + std::map<std::string, Transaction *>::iterator it = + this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) { + return false; + } else { + transaction = it->second; + } + + if (transaction->getState() != TRANSACTION_STARTED + && transaction->getState() != DATA_EXCHANGED) { + logger_->log_info( + "Site2Site transaction %s is not at started or exchanged state", + transactionID.c_str()); + return false; + } + + if (transaction->getDirection() != SEND) { + logger_->log_info("Site2Site transaction %s direction is wrong", + transactionID.c_str()); + return false; + } + + if (transaction->_transfers > 0) { + ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"); + if (ret <= 0) { + return false; + } + } + + // start to read the packet + uint32_t numAttributes = packet->_attributes.size(); + ret = transaction->getStream().write(numAttributes); + if (ret != 4) { + return false; + } + + std::map<std::string, std::string>::iterator itAttribute; + for (itAttribute = packet->_attributes.begin(); + itAttribute != packet->_attributes.end(); itAttribute++) { + ret = transaction->getStream().writeUTF(itAttribute->first, true); + + if (ret <= 0) { + return false; + } + ret = transaction->getStream().writeUTF(itAttribute->second, true); + if (ret <= 0) { + return false; + } + logger_->log_info("Site2Site transaction %s send attribute key %s value %s", + transactionID.c_str(), itAttribute->first.c_str(), + itAttribute->second.c_str()); + } + + uint64_t len = flowFile->getSize(); + ret = transaction->getStream().write(len); + if (ret != 8) { + return false; + } + + if (flowFile->getSize()) { + Site2SiteClientProtocol::ReadCallback callback(packet); + session->read(flowFile, &callback); + if (flowFile->getSize() != packet->_size) { + return false; + } + } + + transaction->_transfers++; + transaction->_state = DATA_EXCHANGED; + transaction->_bytes += len; + logger_->log_info( + "Site2Site transaction %s send flow record %d, total length %d", + transactionID.c_str(), transaction->_transfers, transaction->_bytes); + + return true; } -void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, ProcessSession *session) -{ - uint64_t bytes = 0; - int transfers = 0; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - context->yield(); - tearDown(); - throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); - return; - } - - // Create the transaction - std::string transactionID; - transaction = createTransaction(transactionID, RECEIVE); - - if (transaction == NULL) - { - context->yield(); - tearDown(); - throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); - return; - } - - try - { - while (true) - { - std::map<std::string, std::string> empty; - uint64_t startTime = getTimeMillis(); - DataPacket packet(this, transaction, empty); - bool eof = false; - - if (!receive(transactionID, &packet, eof)) - { - throw Exception(SITE2SITE_EXCEPTION, "Receive Failed"); - return; - } - if (eof) - { - // transaction done - break; - } - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - { - throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); - return; - } - std::map<std::string, std::string>::iterator it; - std::string sourceIdentifier; - for (it = packet._attributes.begin(); it!= packet._attributes.end(); it++) - { - if (it->first == FlowAttributeKey(UUID)) - sourceIdentifier = it->second; - flowFile->addAttribute(it->first, it->second); - } - - if (packet._size > 0) - { - Site2SiteClientProtocol::WriteCallback callback(&packet); - session->write(flowFile, &callback); - if (flowFile->getSize() != packet._size) - { - throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right"); - return; - } - } - Relationship relation; // undefined relationship - uint64_t endTime = getTimeMillis(); - std::string transitUri = peer_->getURL() + "/" + sourceIdentifier; - std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + peer_->getHostName(); - session->getProvenanceReporter()->receive(flowFile, transitUri, sourceIdentifier, details, endTime - startTime); - session->transfer(flowFile, relation); - // receive the transfer for the flow record - bytes += packet._size; - transfers++; - } // while true - - if (!confirm(transactionID)) - { - throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed"); - return; - } - if (!complete(transactionID)) - { - throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed"); - return; - } - logger_->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", - transactionID.c_str(), transfers, bytes); - // we yield the receive if we did not get anything - if (transfers == 0) - context->yield(); - } - catch (std::exception &exception) - { - if (transaction) - deleteTransaction(transactionID); - context->yield(); - tearDown(); - logger_->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - if (transaction) - deleteTransaction(transactionID); - context->yield(); - tearDown(); - logger_->log_debug("Caught Exception during Site2SiteClientProtocol::receiveFlowFiles"); - throw; - } - - deleteTransaction(transactionID); - - return; +void Site2SiteClientProtocol::receiveFlowFiles( + core::ProcessContext *context, + core::ProcessSession *session) { + uint64_t bytes = 0; + int transfers = 0; + Transaction *transaction = NULL; + + if (_peerState != READY) { + bootstrap(); + } + + if (_peerState != READY) { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, + "Can not establish handshake with peer"); + return; + } + + // Create the transaction + std::string transactionID; + transaction = createTransaction(transactionID, RECEIVE); + + if (transaction == NULL) { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); + return; + } + + try { + while (true) { + std::map<std::string, std::string> empty; + uint64_t startTime = getTimeMillis(); + DataPacket packet(this, transaction, empty); + bool eof = false; + + if (!receive(transactionID, &packet, eof)) { + throw Exception(SITE2SITE_EXCEPTION, "Receive Failed"); + return; + } + if (eof) { + // transaction done + break; + } + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());; + if (!flowFile) { + throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); + return; + } + std::map<std::string, std::string>::iterator it; + std::string sourceIdentifier; + for (it = packet._attributes.begin(); it != packet._attributes.end(); + it++) { + if (it->first == FlowAttributeKey(UUID)) + sourceIdentifier = it->second; + flowFile->addAttribute(it->first, it->second); + } + + if (packet._size > 0) { + Site2SiteClientProtocol::WriteCallback callback(&packet); + session->write(flowFile, &callback); + if (flowFile->getSize() != packet._size) { + throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right"); + return; + } + } + core::Relationship relation; // undefined relationship + uint64_t endTime = getTimeMillis(); + std::string transitUri = peer_->getURL() + "/" + sourceIdentifier; + std::string details = "urn:nifi:" + sourceIdentifier + "Remote Host=" + + peer_->getHostName(); + session->getProvenanceReporter()->receive(flowFile, transitUri, + sourceIdentifier, details, + endTime - startTime); + session->transfer(flowFile, relation); + // receive the transfer for the flow record + bytes += packet._size; + transfers++; + } // while true + + if (!confirm(transactionID)) { + throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed"); + return; + } + if (!complete(transactionID)) { + throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed"); + return; + } + logger_->log_info( + "Site2Site transaction %s successfully receive flow record %d, content bytes %d", + transactionID.c_str(), transfers, bytes); + // we yield the receive if we did not get anything + if (transfers == 0) + context->yield(); + } catch (std::exception &exception) { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + logger_->log_debug( + "Caught Exception during Site2SiteClientProtocol::receiveFlowFiles"); + throw; + } + + deleteTransaction(transactionID); + + return; } -bool Site2SiteClientProtocol::confirm(std::string transactionID) -{ - int ret; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - return false; - } - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return false; - } - else - { - transaction = it->second; - } - - if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() && - transaction->getDirection() == RECEIVE) - { - transaction->_state = TRANSACTION_CONFIRMED; - return true; - } - - if (transaction->getState() != DATA_EXCHANGED) - return false; - - if (transaction->getDirection() == RECEIVE) - { - if (transaction->isDataAvailable()) - return false; - // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message - // to peer so that we can verify that the connection is still open. This is a two-phase commit, - // which helps to prevent the chances of data duplication. Without doing this, we may commit the - // session and then when we send the response back to the peer, the peer may have timed out and may not - // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the - // Critical Section involved in this transaction so that rather than the Critical Section being the - // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. - long crcValue = transaction->getCRC(); - std::string crc = std::to_string(crcValue); - logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), - transactionID.c_str()); - ret = writeRespond(CONFIRM_TRANSACTION, crc); - if (ret <= 0) - return false; - RespondCode code; - std::string message; - readRespond(code, message); - if (ret <= 0) - return false; - - if (code == CONFIRM_TRANSACTION) - { - logger_->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str()); - transaction->_state = TRANSACTION_CONFIRMED; - return true; - } - else if (code == BAD_CHECKSUM) - { - logger_->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str()); - /* - transaction->_state = TRANSACTION_CONFIRMED; - return true; */ - return false; - } - else - { - logger_->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); - return false; - } - } - else - { - logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", - transactionID.c_str()); - ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION"); - if (ret <= 0) - return false; - RespondCode code; - std::string message; - readRespond(code, message); - if (ret <= 0) - return false; - - // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response - if (code == CONFIRM_TRANSACTION) - { - logger_->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str()); - if (this->_currentVersion > 3) - { - long crcValue = transaction->getCRC(); - std::string crc = std::to_string(crcValue); - if (message == crc) - { - logger_->log_info("Site2Site transaction %s CRC matched", transactionID.c_str()); - ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); - if (ret <= 0) - return false; - transaction->_state = TRANSACTION_CONFIRMED; - return true; - } - else - { - logger_->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str()); - ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM"); - /* - ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); - if (ret <= 0) - return false; - transaction->_state = TRANSACTION_CONFIRMED; - return true; */ - return false; - } - } - ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); - if (ret <= 0) - return false; - transaction->_state = TRANSACTION_CONFIRMED; - return true; - } - else - { - logger_->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); - return false; - } - return false; - } +bool Site2SiteClientProtocol::confirm(std::string transactionID) { + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) { + bootstrap(); + } + + if (_peerState != READY) { + return false; + } + + std::map<std::string, Transaction *>::iterator it = + this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) { + return false; + } else { + transaction = it->second; + } + + if (transaction->getState() == TRANSACTION_STARTED + && !transaction->isDataAvailable() + && transaction->getDirection() == RECEIVE) { + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } + + if (transaction->getState() != DATA_EXCHANGED) + return false; + + if (transaction->getDirection() == RECEIVE) { + if (transaction->isDataAvailable()) + return false; + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + long crcValue = transaction->getCRC(); + std::string crc = std::to_string(crcValue); + logger_->log_info("Site2Site Send confirm with CRC %d to transaction %s", + transaction->getCRC(), transactionID.c_str()); + ret = writeRespond(CONFIRM_TRANSACTION, crc); + if (ret <= 0) + return false; + RespondCode code; + std::string message; + readRespond(code, message); + if (ret <= 0) + return false; + + if (code == CONFIRM_TRANSACTION) { + logger_->log_info("Site2Site transaction %s peer confirm transaction", + transactionID.c_str()); + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } else if (code == BAD_CHECKSUM) { + logger_->log_info("Site2Site transaction %s peer indicate bad checksum", + transactionID.c_str()); + /* + transaction->_state = TRANSACTION_CONFIRMED; + return true; */ + return false; + } else { + logger_->log_info("Site2Site transaction %s peer unknown respond code %d", + transactionID.c_str(), code); + return false; + } + } else { + logger_->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", + transactionID.c_str()); + ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION"); + if (ret <= 0) + return false; + RespondCode code; + std::string message; + readRespond(code, message); + if (ret <= 0) + return false; + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + if (code == CONFIRM_TRANSACTION) { + logger_->log_info( + "Site2Site transaction %s peer confirm transaction with CRC %s", + transactionID.c_str(), message.c_str()); + if (this->_currentVersion > 3) { + long crcValue = transaction->getCRC(); + std::string crc = std::to_string(crcValue); + if (message == crc) { + logger_->log_info("Site2Site transaction %s CRC matched", + transactionID.c_str()); + ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); + if (ret <= 0) + return false; + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } else { + logger_->log_info("Site2Site transaction %s CRC not matched %s", + transactionID.c_str(), crc.c_str()); + ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM"); + /* + ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); + if (ret <= 0) + return false; + transaction->_state = TRANSACTION_CONFIRMED; + return true; */ + return false; + } + } + ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); + if (ret <= 0) + return false; + transaction->_state = TRANSACTION_CONFIRMED; + return true; + } else { + logger_->log_info("Site2Site transaction %s peer unknown respond code %d", + transactionID.c_str(), code); + return false; + } + return false; + } } -void Site2SiteClientProtocol::cancel(std::string transactionID) -{ - Transaction *transaction = NULL; - - if (_peerState != READY) - { - return; - } - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return; - } - else - { - transaction = it->second; - } - - if (transaction->getState() == TRANSACTION_CANCELED || transaction->getState() == TRANSACTION_COMPLETED - || transaction->getState() == TRANSACTION_ERROR) - { - return; - } - - this->writeRespond(CANCEL_TRANSACTION, "Cancel"); - transaction->_state = TRANSACTION_CANCELED; - - tearDown(); - return; +void Site2SiteClientProtocol::cancel(std::string transactionID) { + Transaction *transaction = NULL; + + if (_peerState != READY) { + return; + } + + std::map<std::string, Transaction *>::iterator it = + this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) { + return; + } else { + transaction = it->second; + } + + if (transaction->getState() == TRANSACTION_CANCELED + || transaction->getState() == TRANSACTION_COMPLETED + || transaction->getState() == TRANSACTION_ERROR) { + return; + } + + this->writeRespond(CANCEL_TRANSACTION, "Cancel"); + transaction->_state = TRANSACTION_CANCELED; + + tearDown(); + return; } -void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) -{ - Transaction *transaction = NULL; +void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) { + Transaction *transaction = NULL; - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + std::map<std::string, Transaction *>::iterator it = + this->_transactionMap.find(transactionID); - if (it == _transactionMap.end()) - { - return; - } - else - { - transaction = it->second; - } + if (it == _transactionMap.end()) { + return; + } else { + transaction = it->second; + } - logger_->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str()); - delete transaction; - _transactionMap.erase(transactionID); + logger_->log_info("Site2Site delete transaction %s", + transaction->getUUIDStr().c_str()); + delete transaction; + _transactionMap.erase(transactionID); } -void Site2SiteClientProtocol::error(std::string transactionID) -{ - Transaction *transaction = NULL; +void Site2SiteClientProtocol::error(std::string transactionID) { + Transaction *transaction = NULL; - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); + std::map<std::string, Transaction *>::iterator it = + this->_transactionMap.find(transactionID); - if (it == _transactionMap.end()) - { - return; - } - else - { - transaction = it->second; - } + if (it == _transactionMap.end()) { + return; + } else { + transaction = it->second; + } - transaction->_state = TRANSACTION_ERROR; - tearDown(); - return; + transaction->_state = TRANSACTION_ERROR; + tearDown(); + return; } -//! Complete the transaction -bool Site2SiteClientProtocol::complete(std::string transactionID) -{ - int ret; - Transaction *transaction = NULL; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - return false; - } - - std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); - - if (it == _transactionMap.end()) - { - return false; - } - else - { - transaction = it->second; - } - - if (transaction->getState() != TRANSACTION_CONFIRMED) - { - return false; - } - - if (transaction->getDirection() == RECEIVE) - { - if (transaction->_transfers == 0) - { - transaction->_state = TRANSACTION_COMPLETED; - return true; - } - else - { - logger_->log_info("Site2Site transaction %s send finished", transactionID.c_str()); - ret = this->writeRespond(TRANSACTION_FINISHED, "Finished"); - if (ret <= 0) - return false; - else - { - transaction->_state = TRANSACTION_COMPLETED; - return true; - } - } - } - else - { - RespondCode code; - std::string message; - int ret; - - ret = readRespond(code, message); - - if (ret <= 0) - return false; - - if (code == TRANSACTION_FINISHED) - { - logger_->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str()); - transaction->_state = TRANSACTION_COMPLETED; - return true; - } - else - { - logger_->log_info("Site2Site transaction %s peer unknown respond code %d", - transactionID.c_str(), code); - return false; - } - } +// Complete the transaction +bool Site2SiteClientProtocol::complete(std::string transactionID) { + int ret; + Transaction *transaction = NULL; + + if (_peerState != READY) { + bootstrap(); + } + + if (_peerState != READY) { + return false; + } + + std::map<std::string, Transaction *>::iterator it = + this->_transactionMap.find(transactionID); + + if (it == _transactionMap.end()) { + return false; + } else { + transaction = it->second; + } + + if (transaction->getState() != TRANSACTION_CONFIRMED) { + return false; + } + + if (transaction->getDirection() == RECEIVE) { + if (transaction->_transfers == 0) { + transaction->_state = TRANSACTION_COMPLETED; + return true; + } else { + logger_->log_info("Site2Site transaction %s send finished", + transactionID.c_str()); + ret = this->writeRespond(TRANSACTION_FINISHED, "Finished"); + if (ret <= 0) + return false; + else { + transaction->_state = TRANSACTION_COMPLETED; + return true; + } + } + } else { + RespondCode code; + std::string message; + int ret; + + ret = readRespond(code, message); + + if (ret <= 0) + return false; + + if (code == TRANSACTION_FINISHED) { + logger_->log_info("Site2Site transaction %s peer finished transaction", + transactionID.c_str()); + transaction->_state = TRANSACTION_COMPLETED; + return true; + } else { + logger_->log_info("Site2Site transaction %s peer unknown respond code %d", + transactionID.c_str(), code); + return false; + } + } } -void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, ProcessSession *session) -{ - FlowFileRecord *flow = session->get(); - Transaction *transaction = NULL; - - if (!flow) - return; - - if (_peerState != READY) - { - bootstrap(); - } - - if (_peerState != READY) - { - context->yield(); - tearDown(); - throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); - return; - } - - // Create the transaction - std::string transactionID; - transaction = createTransaction(transactionID, SEND); - - if (transaction == NULL) - { - context->yield(); - tearDown(); - throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); - return; - } - - bool continueTransaction = true; - uint64_t startSendingNanos = getTimeNano(); - - try - { - while (continueTransaction) - { - uint64_t startTime = getTimeMillis(); - DataPacket packet(this, transaction, flow->getAttributes()); - - if (!send(transactionID, &packet, flow, session)) - { - throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); - return; - } - logger_->log_info("Site2Site transaction %s send flow record %s", - transactionID.c_str(), flow->getUUIDStr().c_str()); - uint64_t endTime = getTimeMillis(); - std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr(); - std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + peer_->getHostName(); - session->getProvenanceReporter()->send(flow, transitUri, details, endTime - startTime, false); - session->remove(flow); - - uint64_t transferNanos = getTimeNano() - startSendingNanos; - if (transferNanos > _batchSendNanos) - break; - - flow = session->get(); - if (!flow) - { - continueTransaction = false; - } - } // while true - - if (!confirm(transactionID)) - { - throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); - return; - } - if (!complete(transactionID)) - { - throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); - return; - } - logger_->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", - transactionID.c_str(), transaction->_transfers, transaction->_bytes); - } - catch (std::exception &exception) - { - if (transaction) - deleteTransaction(transactionID); - context->yield(); - tearDown(); - logger_->log_debug("Caught Exception %s", exception.what()); - throw; - } - catch (...) - { - if (transaction) - deleteTransaction(transactionID); - context->yield(); - tearDown(); - logger_->log_debug("Caught Exception during Site2SiteClientProtocol::transferFlowFiles"); - throw; - } - - deleteTransaction(transactionID); - - return; +void Site2SiteClientProtocol::transferFlowFiles( + core::ProcessContext *context, + core::ProcessSession *session) { + std::shared_ptr<FlowFileRecord> flow = std::static_pointer_cast<FlowFileRecord>(session->get());; + Transaction *transaction = NULL; + + if (!flow) + return; + + if (_peerState != READY) { + bootstrap(); + } + + if (_peerState != READY) { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, + "Can not establish handshake with peer"); + return; + } + + // Create the transaction + std::string transactionID; + transaction = createTransaction(transactionID, SEND); + + if (transaction == NULL) { + context->yield(); + tearDown(); + throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); + return; + } + + bool continueTransaction = true; + uint64_t startSendingNanos = getTimeNano(); + + try { + while (continueTransaction) { + uint64_t startTime = getTimeMillis(); + DataPacket packet(this, transaction, flow->getAttributes()); + + if (!send(transactionID, &packet, flow, session)) { + throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); + return; + } + logger_->log_info("Site2Site transaction %s send flow record %s", + transactionID.c_str(), flow->getUUIDStr().c_str()); + uint64_t endTime = getTimeMillis(); + std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr(); + std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" + + peer_->getHostName(); + session->getProvenanceReporter()->send(flow, transitUri, details, + endTime - startTime, false); + session->remove(flow); + + uint64_t transferNanos = getTimeNano() - startSendingNanos; + if (transferNanos > _batchSendNanos) + break; + + flow = std::static_pointer_cast<FlowFileRecord>(session->get());; + if (!flow) { + continueTransaction = false; + } + } // while true + + if (!confirm(transactionID)) { + throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); + return; + } + if (!complete(transactionID)) { + throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); + return; + } + logger_->log_info( + "Site2Site transaction %s successfully send flow record %d, content bytes %d", + transactionID.c_str(), transaction->_transfers, transaction->_bytes); + } catch (std::exception &exception) { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + logger_->log_debug("Caught Exception %s", exception.what()); + throw; + } catch (...) { + if (transaction) + deleteTransaction(transactionID); + context->yield(); + tearDown(); + logger_->log_debug( + "Caught Exception during Site2SiteClientProtocol::transferFlowFiles"); + throw; + } + + deleteTransaction(transactionID); + + return; } + + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/Site2SitePeer.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Site2SitePeer.cpp b/libminifi/src/Site2SitePeer.cpp index ae3cc2d..64732ac 100644 --- a/libminifi/src/Site2SitePeer.cpp +++ b/libminifi/src/Site2SitePeer.cpp @@ -31,6 +31,12 @@ #include "Site2SitePeer.h" #include "FlowController.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + bool Site2SitePeer::Open() { if (IsNullOrEmpty (host_)) @@ -53,3 +59,9 @@ void Site2SitePeer::Close() { stream_->closeStream(); } + + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/TailFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/TailFile.cpp b/libminifi/src/TailFile.cpp deleted file mode 100644 index 36b5e53..0000000 --- a/libminifi/src/TailFile.cpp +++ /dev/null @@ -1,269 +0,0 @@ -/** - * @file TailFile.cpp - * TailFile class implementation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include <vector> -#include <queue> -#include <map> -#include <set> -#include <sys/time.h> -#include <sys/types.h> -#include <sys/stat.h> -#include <time.h> -#include <sstream> -#include <stdio.h> -#include <string> -#include <iostream> -#include <dirent.h> -#include <limits.h> -#include <unistd.h> - -#include "utils/TimeUtil.h" -#include "utils/StringUtils.h" -#include "TailFile.h" -#include "ProcessContext.h" -#include "ProcessSession.h" - -const std::string TailFile::ProcessorName("TailFile"); -Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", ""); -Property TailFile::StateFile("State File", - "Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off", "TailFileState"); -Relationship TailFile::Success("success", "All files are routed to success"); - -void TailFile::initialize() -{ - //! Set the supported properties - std::set<Property> properties; - properties.insert(FileName); - properties.insert(StateFile); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); -} - -std::string TailFile::trimLeft(const std::string& s) -{ - return StringUtils::trimLeft(s); -} - -std::string TailFile::trimRight(const std::string& s) -{ - return StringUtils::trimRight(s); -} - -void TailFile::parseStateFileLine(char *buf) -{ - char *line = buf; - - while ((line[0] == ' ') || (line[0] =='\t')) - ++line; - - char first = line[0]; - if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) - { - return; - } - - char *equal = strchr(line, '='); - if (equal == NULL) - { - return; - } - - equal[0] = '\0'; - std::string key = line; - - equal++; - while ((equal[0] == ' ') || (equal[0] == '\t')) - ++equal; - - first = equal[0]; - if ((first == '\0') || (first == '\r') || (first== '\n')) - { - return; - } - - std::string value = equal; - key = trimRight(key); - value = trimRight(value); - - if (key == "FILENAME") - this->_currentTailFileName = value; - if (key == "POSITION") - this->_currentTailFilePosition = std::stoi(value); - - return; -} - -void TailFile::recoverState() -{ - std::ifstream file(_stateFile.c_str(), std::ifstream::in); - if (!file.good()) - { - logger_->log_error("load state file failed %s", _stateFile.c_str()); - return; - } - const unsigned int bufSize = 512; - char buf[bufSize]; - for (file.getline(buf,bufSize); file.good(); file.getline(buf,bufSize)) - { - parseStateFileLine(buf); - } -} - -void TailFile::storeState() -{ - std::ofstream file(_stateFile.c_str()); - if (!file.is_open()) - { - logger_->log_error("store state file failed %s", _stateFile.c_str()); - return; - } - file << "FILENAME=" << this->_currentTailFileName << "\n"; - file << "POSITION=" << this->_currentTailFilePosition << "\n"; - file.close(); -} - -static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) -{ - return (i.modifiedTime < j.modifiedTime); -} -void TailFile::checkRollOver() -{ - struct stat statbuf; - std::vector<TailMatchedFileItem> matchedFiles; - std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; - - if (stat(fullPath.c_str(), &statbuf) == 0) - { - if (statbuf.st_size > this->_currentTailFilePosition) - // there are new input for the current tail file - return; - - uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000); - std::string pattern = _fileName; - std::size_t found = _fileName.find_last_of("."); - if (found != std::string::npos) - pattern = _fileName.substr(0,found); - DIR *d; - d = opendir(this->_fileLocation.c_str()); - if (!d) - return; - while (1) - { - struct dirent *entry; - entry = readdir(d); - if (!entry) - break; - std::string d_name = entry->d_name; - if (!(entry->d_type & DT_DIR)) - { - std::string fileName = d_name; - std::string fileFullName = this->_fileLocation + "/" + d_name; - if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) - { - if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) - { - TailMatchedFileItem item; - item.fileName = fileName; - item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); - matchedFiles.push_back(item); - } - } - } - } - closedir(d); - - // Sort the list based on modified time - std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem); - for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it!=matchedFiles.end(); ++it) - { - TailMatchedFileItem item = *it; - if (item.fileName == _currentTailFileName) - { - ++it; - if (it!=matchedFiles.end()) - { - TailMatchedFileItem nextItem = *it; - logger_->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str()); - _currentTailFileName = nextItem.fileName; - _currentTailFilePosition = 0; - storeState(); - } - break; - } - } - } - else - return; -} - - -void TailFile::onTrigger(ProcessContext *context, ProcessSession *session) -{ - std::string value; - if (context->getProperty(FileName.getName(), value)) - { - std::size_t found = value.find_last_of("/\\"); - this->_fileLocation = value.substr(0,found); - this->_fileName = value.substr(found+1); - } - if (context->getProperty(StateFile.getName(), value)) - { - _stateFile = value + "." + getUUIDStr(); - } - if (!this->_stateRecovered) - { - _stateRecovered = true; - this->_currentTailFileName = _fileName; - this->_currentTailFilePosition = 0; - // recover the state if we have not done so - this->recoverState(); - } - checkRollOver(); - std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; - struct stat statbuf; - if (stat(fullPath.c_str(), &statbuf) == 0) - { - if (statbuf.st_size <= this->_currentTailFilePosition) - // there are no new input for the current tail file - { - context->yield(); - return; - } - FlowFileRecord *flowFile = session->create(); - if (!flowFile) - return; - std::size_t found = _currentTailFileName.find_last_of("."); - std::string baseName = _currentTailFileName.substr(0,found); - std::string extension = _currentTailFileName.substr(found+1); - flowFile->updateAttribute(PATH, _fileLocation); - flowFile->addAttribute(ABSOLUTE_PATH, fullPath); - session->import(fullPath, flowFile, true, this->_currentTailFilePosition); - session->transfer(flowFile, Success); - logger_->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize()); - std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + - std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension; - flowFile->updateAttribute(FILENAME, logName); - this->_currentTailFilePosition += flowFile->getSize(); - storeState(); - } -} - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/ThreadedSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 2008fec..6c04281 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -22,103 +22,125 @@ #include "ThreadedSchedulingAgent.h" -#include "ProcessContext.h" -#include "ProcessSession.h" -#include "ProcessSessionFactory.h" - -void ThreadedSchedulingAgent::schedule(Processor *processor) -{ - std::lock_guard<std::mutex> lock(_mtx); - - _administrativeYieldDuration = 0; - std::string yieldValue; - - if (configure_->get(Configure::nifi_administrative_yield_duration, yieldValue)) - { - TimeUnit unit; - if (Property::StringToTime(yieldValue, _administrativeYieldDuration, unit) && - Property::ConvertTimeUnitToMS(_administrativeYieldDuration, unit, _administrativeYieldDuration)) - { - logger_->log_debug("nifi_administrative_yield_duration: [%d] ms", _administrativeYieldDuration); - } - } - - _boredYieldDuration = 0; - if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) - { - TimeUnit unit; - if (Property::StringToTime(yieldValue, _boredYieldDuration, unit) && - Property::ConvertTimeUnitToMS(_boredYieldDuration, unit, _boredYieldDuration)) - { - logger_->log_debug("nifi_bored_yield_duration: [%d] ms", _boredYieldDuration); - } - } - - if (processor->getScheduledState() != RUNNING) - { - logger_->log_info("Can not schedule threads for processor %s because it is not running", processor->getName().c_str()); - return; - } - - std::map<std::string, std::vector<std::thread *>>::iterator it = - _threads.find(processor->getUUIDStr()); - if (it != _threads.end()) - { - logger_->log_info("Can not schedule threads for processor %s because there are existing threads running"); - return; - } - - auto processContext = std::make_shared<ProcessContext>(processor); - auto sessionFactory = std::make_shared<ProcessSessionFactory>(processContext.get()); - - processor->onSchedule(processContext.get(), sessionFactory.get()); - - std::vector<std::thread *> threads; - for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) - { - ThreadedSchedulingAgent *agent = this; - std::thread *thread = new std::thread([agent, processor, processContext, sessionFactory] () { - agent->run(processor, processContext.get(), sessionFactory.get()); - }); - thread->detach(); - threads.push_back(thread); - logger_->log_info("Scheduled thread %d running for process %s", thread->get_id(), - processor->getName().c_str()); - } - _threads[processor->getUUIDStr().c_str()] = threads; - - return; +#include "core/Connectable.h" +#include "core/ProcessorNode.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessSessionFactory.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { + +void ThreadedSchedulingAgent::schedule( + std::shared_ptr<core::Processor> processor) { + std::lock_guard < std::mutex > lock(mutex_); + + _administrativeYieldDuration = 0; + std::string yieldValue; + + if (configure_->get(Configure::nifi_administrative_yield_duration, + yieldValue)) { + core::TimeUnit unit; + if (core::Property::StringToTime( + yieldValue, _administrativeYieldDuration, unit) + && core::Property::ConvertTimeUnitToMS( + _administrativeYieldDuration, unit, _administrativeYieldDuration)) { + logger_->log_debug("nifi_administrative_yield_duration: [%d] ms", + _administrativeYieldDuration); + } + } + + _boredYieldDuration = 0; + if (configure_->get(Configure::nifi_bored_yield_duration, yieldValue)) { + core::TimeUnit unit; + if (core::Property::StringToTime( + yieldValue, _boredYieldDuration, unit) + && core::Property::ConvertTimeUnitToMS( + _boredYieldDuration, unit, _boredYieldDuration)) { + logger_->log_debug("nifi_bored_yield_duration: [%d] ms", + _boredYieldDuration); + } + } + + if (processor->getScheduledState() != core::RUNNING) { + logger_->log_info( + "Can not schedule threads for processor %s because it is not running", + processor->getName().c_str()); + return; + } + + std::map<std::string, std::vector<std::thread *>>::iterator it = + _threads.find(processor->getUUIDStr()); + if (it != _threads.end()) { + logger_->log_info( + "Can not schedule threads for processor %s because there are existing threads running"); + return; + } + + core::ProcessorNode processor_node(processor); + auto processContext = std::make_shared + < core::ProcessContext > (processor_node,repo_); + auto sessionFactory = std::make_shared + < core::ProcessSessionFactory + > (processContext.get()); + + processor->onSchedule(processContext.get(), sessionFactory.get()); + + std::vector<std::thread *> threads; + for (int i = 0; i < processor->getMaxConcurrentTasks(); i++) { + ThreadedSchedulingAgent *agent = this; + std::thread *thread = new std::thread( + [agent, processor, processContext, sessionFactory] () { + agent->run(processor, processContext.get(), sessionFactory.get()); + }); + thread->detach(); + threads.push_back(thread); + logger_->log_info("Scheduled thread %d running for process %s", + thread->get_id(), processor->getName().c_str()); + } + _threads[processor->getUUIDStr().c_str()] = threads; + + return; } -void ThreadedSchedulingAgent::unschedule(Processor *processor) -{ - std::lock_guard<std::mutex> lock(_mtx); - - logger_->log_info("Shutting down threads for processor %s/%s", - processor->getName().c_str(), - processor->getUUIDStr().c_str()); - - if (processor->getScheduledState() != RUNNING) - { - logger_->log_info("Cannot unschedule threads for processor %s because it is not running", processor->getName().c_str()); - return; - } - - std::map<std::string, std::vector<std::thread *>>::iterator it = - _threads.find(processor->getUUIDStr()); - - if (it == _threads.end()) - { - logger_->log_info("Cannot unschedule threads for processor %s because there are no existing threads running", processor->getName().c_str()); - return; - } - for (std::vector<std::thread *>::iterator itThread = it->second.begin(); itThread != it->second.end(); ++itThread) - { - std::thread *thread = *itThread; - logger_->log_info("Scheduled thread %d deleted for process %s", thread->get_id(), - processor->getName().c_str()); - delete thread; - } - _threads.erase(processor->getUUIDStr()); - processor->clearActiveTask(); +void ThreadedSchedulingAgent::unschedule(std::shared_ptr<core::Processor> processor) { + std::lock_guard < std::mutex > lock(mutex_); + + logger_->log_info("Shutting down threads for processor %s/%s", + processor->getName().c_str(), + processor->getUUIDStr().c_str()); + + if (processor->getScheduledState() != core::RUNNING) { + logger_->log_info( + "Cannot unschedule threads for processor %s because it is not running", + processor->getName().c_str()); + return; + } + + std::map<std::string, std::vector<std::thread *>>::iterator it = + _threads.find(processor->getUUIDStr()); + + if (it == _threads.end()) { + logger_->log_info( + "Cannot unschedule threads for processor %s because there are no existing threads running", + processor->getName().c_str()); + return; + } + for (std::vector<std::thread *>::iterator itThread = it->second.begin(); + itThread != it->second.end(); ++itThread) { + std::thread *thread = *itThread; + logger_->log_info("Scheduled thread %d deleted for process %s", + thread->get_id(), processor->getName().c_str()); + delete thread; + } + _threads.erase(processor->getUUIDStr()); + processor->clearActiveTask(); } + + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/44704b36/libminifi/src/TimerDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/TimerDrivenSchedulingAgent.cpp b/libminifi/src/TimerDrivenSchedulingAgent.cpp index 30dc96c..3895e81 100644 --- a/libminifi/src/TimerDrivenSchedulingAgent.cpp +++ b/libminifi/src/TimerDrivenSchedulingAgent.cpp @@ -20,26 +20,38 @@ #include <chrono> #include <thread> #include <iostream> -#include "Property.h" #include "TimerDrivenSchedulingAgent.h" +#include "core/Property.h" -void TimerDrivenSchedulingAgent::run(Processor *processor, ProcessContext *processContext, ProcessSessionFactory *sessionFactory) -{ - while (this->_running) - { - bool shouldYield = this->onTrigger(processor, processContext, sessionFactory); +namespace org { +namespace apache { +namespace nifi { +namespace minifi { - if (processor->isYield()) - { - // Honor the yield - std::this_thread::sleep_for(std::chrono::milliseconds(processor->getYieldTime())); - } - else if (shouldYield && this->_boredYieldDuration > 0) - { - // No work to do or need to apply back pressure - std::this_thread::sleep_for(std::chrono::milliseconds(this->_boredYieldDuration)); - } - std::this_thread::sleep_for(std::chrono::nanoseconds(processor->getSchedulingPeriodNano())); - } - return; +void TimerDrivenSchedulingAgent::run( + std::shared_ptr<core::Processor> processor, + core::ProcessContext *processContext, + core::ProcessSessionFactory *sessionFactory) { + while (this->running_) { + bool shouldYield = this->onTrigger(processor, processContext, + sessionFactory); + + if (processor->isYield()) { + // Honor the yield + std::this_thread::sleep_for( + std::chrono::milliseconds(processor->getYieldTime())); + } else if (shouldYield && this->_boredYieldDuration > 0) { + // No work to do or need to apply back pressure + std::this_thread::sleep_for( + std::chrono::milliseconds(this->_boredYieldDuration)); + } + std::this_thread::sleep_for( + std::chrono::nanoseconds(processor->getSchedulingPeriodNano())); + } + return; } + +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
