This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch MINIFICPP-1507 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit ee2ac71af1822c1da5b78bf6d3a032c3543546a6 Author: Marton Szasz <[email protected]> AuthorDate: Tue Mar 16 11:58:51 2021 +0100 fix s2s issues --- libminifi/src/io/BufferStream.cpp | 2 +- libminifi/src/sitetosite/RawSocketProtocol.cpp | 133 ++++++++++++++----------- libminifi/src/sitetosite/SiteToSiteClient.cpp | 14 +-- 3 files changed, 78 insertions(+), 71 deletions(-) diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp index 12220ef..1dd64ea 100644 --- a/libminifi/src/io/BufferStream.cpp +++ b/libminifi/src/io/BufferStream.cpp @@ -39,7 +39,7 @@ int BufferStream::write(const uint8_t *value, int size) { size_t BufferStream::read(uint8_t *buf, size_t len) { const auto bytes_available_in_buffer = buffer_.size() - readOffset_; const auto readlen = std::min(len, bytes_available_in_buffer); - auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_); + const auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_); std::copy(begin, begin + gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf); // increase offset for the next read diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp index 0eb1f7e..c75eb17 100644 --- a/libminifi/src/sitetosite/RawSocketProtocol.cpp +++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp @@ -114,28 +114,31 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() { logger_->log_debug("Negotiate protocol version with destination port %s current version %d", port_id_.to_string(), _currentVersion); - int ret = peer_->write(getResourceName()); - - logger_->log_trace("result of writing resource name is %i", ret); - if (ret <= 0) { - logger_->log_debug("result of writing resource name is %i", ret); - // tearDown(); - return false; + { + const auto ret = peer_->write(getResourceName()); + logger_->log_trace("result of writing resource name is %i", ret); + if (ret <= 0) { + logger_->log_debug("result of writing resource name is %i", ret); + // tearDown(); + return false; + } } - ret = peer_->write(_currentVersion); - - if (ret <= 0) { - logger_->log_debug("result of writing version is %i", ret); - return false; + { + const auto ret = peer_->write(_currentVersion); + if (ret <= 0) { + logger_->log_debug("result of writing version is %i", ret); + return false; + } } uint8_t statusCode; - ret = peer_->read(statusCode); - - if (ret <= 0) { - logger_->log_debug("result of writing version status code %i", ret); - return false; + { + const auto ret = peer_->read(statusCode); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + logger_->log_debug("result of writing version status code %i", ret); + return false; + } } logger_->log_debug("status code is %i", statusCode); switch (statusCode) { @@ -144,9 +147,11 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() { return true; case DIFFERENT_RESOURCE_VERSION: uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { - return false; + { + const auto ret = peer_->read(serverVersion); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion; @@ -178,36 +183,40 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() { logger_->log_trace("Negotiate Codec version with destination port %s current version %d", port_id_.to_string(), _currentCodecVersion); - int ret = peer_->write(getCodecResourceName()); - - if (ret <= 0) { - logger_->log_debug("result of getCodecResourceName is %i", ret); - return false; + { + const auto ret = peer_->write(getCodecResourceName()); + if (ret <= 0) { + logger_->log_debug("result of getCodecResourceName is %i", ret); + return false; + } } - ret = peer_->write(_currentCodecVersion); - - if (ret <= 0) { - logger_->log_debug("result of _currentCodecVersion is %i", ret); - return false; + { + const auto ret = peer_->write(_currentCodecVersion); + if (ret <= 0) { + logger_->log_debug("result of _currentCodecVersion is %i", ret); + return false; + } } uint8_t statusCode; - ret = peer_->read(statusCode); - - if (ret <= 0) { - return false; + { + const auto ret = peer_->read(statusCode); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } - switch (statusCode) { case RESOURCE_OK: logger_->log_trace("Site2Site Codec Negotiate version OK"); return true; case DIFFERENT_RESOURCE_VERSION: uint32_t serverVersion; - ret = peer_->read(serverVersion); - if (ret <= 0) { - return false; + { + const auto ret = peer_->read(serverVersion); + if (ret == 0 || ret == static_cast<size_t>(-1)) { + return false; + } } logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion; @@ -237,10 +246,11 @@ bool RawSiteToSiteClient::handShake() { logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", port_id_.to_string()); _commsIdentifier = id_generator_->generate(); - int ret = peer_->write(_commsIdentifier); - - if (ret <= 0) { - return false; + { + const auto ret = peer_->write(_commsIdentifier); + if (ret <= 0) { + return false; + } } std::map<std::string, std::string> properties; @@ -257,27 +267,33 @@ bool RawSiteToSiteClient::handShake() { } if (_currentVersion >= 3) { - ret = peer_->write(peer_->getURL()); + const auto ret = peer_->write(peer_->getURL()); if (ret <= 0) { return false; } } - uint32_t size = gsl::narrow<uint32_t>(properties.size()); - ret = peer_->write(size); - if (ret <= 0) { - return false; + { + const auto size = gsl::narrow<uint32_t>(properties.size()); + const auto ret = peer_->write(size); + if (ret <= 0) { + return false; + } } std::map<std::string, std::string>::iterator it; for (it = properties.begin(); it != properties.end(); it++) { - ret = peer_->write(it->first); - if (ret <= 0) { - return false; + { + const auto ret = peer_->write(it->first); + if (ret <= 0) { + return false; + } } - ret = peer_->write(it->second); - if (ret <= 0) { - return false; + { + const auto ret = peer_->write(it->second); + if (ret <= 0) { + return false; + } } logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", it->first, it->second); } @@ -285,10 +301,11 @@ bool RawSiteToSiteClient::handShake() { RespondCode code; std::string message; - ret = readRespond(nullptr, code, message); - - if (ret <= 0) { - return false; + { + const auto ret = readRespond(nullptr, code, message); + if (ret <= 0) { + return false; + } } std::string error; @@ -310,13 +327,11 @@ bool RawSiteToSiteClient::handShake() { // Unknown error default: logger_->log_error("HandShake Failed because of unknown respond code %d", code); - ret = -1; return false; } // All known error cases handled here logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_.to_string(), error); - ret = -1; return false; } diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index fb24f73..b009c18 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -31,28 +31,21 @@ namespace sitetosite { int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode &code, std::string &message) { uint8_t firstByte; - auto ret = peer_->read(firstByte); - - if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1) + if (ret == 0 || ret == static_cast<size_t>(-1) || firstByte != CODE_SEQUENCE_VALUE_1) return -1; uint8_t secondByte; - ret = peer_->read(secondByte); - - if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2) + if (ret == 0 || ret == static_cast<size_t>(-1) || secondByte != CODE_SEQUENCE_VALUE_2) return -1; uint8_t thirdByte; - ret = peer_->read(thirdByte); - - if (ret != static_cast<size_t>(-1)) + if (ret == static_cast<size_t>(-1)) return gsl::narrow_cast<int>(ret); code = (RespondCode) thirdByte; - RespondCodeContext *resCode = this->getRespondCodeContext(code); if (!resCode) { return -1; @@ -69,7 +62,6 @@ void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) std::shared_ptr<Transaction> transaction; auto it = this->known_transactions_.find(transactionID); - if (it == known_transactions_.end()) { return; } else {
