This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit ee2ac71af1822c1da5b78bf6d3a032c3543546a6
Author: Marton Szasz <[email protected]>
AuthorDate: Tue Mar 16 11:58:51 2021 +0100

    fix s2s issues
---
 libminifi/src/io/BufferStream.cpp              |   2 +-
 libminifi/src/sitetosite/RawSocketProtocol.cpp | 133 ++++++++++++++-----------
 libminifi/src/sitetosite/SiteToSiteClient.cpp  |  14 +--
 3 files changed, 78 insertions(+), 71 deletions(-)

diff --git a/libminifi/src/io/BufferStream.cpp 
b/libminifi/src/io/BufferStream.cpp
index 12220ef..1dd64ea 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -39,7 +39,7 @@ int BufferStream::write(const uint8_t *value, int size) {
 size_t BufferStream::read(uint8_t *buf, size_t len) {
   const auto bytes_available_in_buffer = buffer_.size() - readOffset_;
   const auto readlen = std::min(len, bytes_available_in_buffer);
-  auto begin = buffer_.begin() + 
gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
+  const auto begin = buffer_.begin() + 
gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
   std::copy(begin, begin + 
gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf);
 
   // increase offset for the next read
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp 
b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 0eb1f7e..c75eb17 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -114,28 +114,31 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
 
   logger_->log_debug("Negotiate protocol version with destination port %s 
current version %d", port_id_.to_string(), _currentVersion);
 
-  int ret = peer_->write(getResourceName());
-
-  logger_->log_trace("result of writing resource name is %i", ret);
-  if (ret <= 0) {
-    logger_->log_debug("result of writing resource name is %i", ret);
-    // tearDown();
-    return false;
+  {
+    const auto ret = peer_->write(getResourceName());
+    logger_->log_trace("result of writing resource name is %i", ret);
+    if (ret <= 0) {
+      logger_->log_debug("result of writing resource name is %i", ret);
+      // tearDown();
+      return false;
+    }
   }
 
-  ret = peer_->write(_currentVersion);
-
-  if (ret <= 0) {
-    logger_->log_debug("result of writing version is %i", ret);
-    return false;
+  {
+    const auto ret = peer_->write(_currentVersion);
+    if (ret <= 0) {
+      logger_->log_debug("result of writing version is %i", ret);
+      return false;
+    }
   }
 
   uint8_t statusCode;
-  ret = peer_->read(statusCode);
-
-  if (ret <= 0) {
-    logger_->log_debug("result of writing version status code  %i", ret);
-    return false;
+  {
+    const auto ret = peer_->read(statusCode);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      logger_->log_debug("result of writing version status code  %i", ret);
+      return false;
+    }
   }
   logger_->log_debug("status code is %i", statusCode);
   switch (statusCode) {
@@ -144,9 +147,11 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
       return true;
     case DIFFERENT_RESOURCE_VERSION:
       uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        return false;
+      {
+        const auto ret = peer_->read(serverVersion);
+        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+          return false;
+        }
       }
 
       logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a 
different protocol version " << serverVersion;
@@ -178,36 +183,40 @@ bool 
RawSiteToSiteClient::initiateCodecResourceNegotiation() {
 
   logger_->log_trace("Negotiate Codec version with destination port %s current 
version %d", port_id_.to_string(), _currentCodecVersion);
 
-  int ret = peer_->write(getCodecResourceName());
-
-  if (ret <= 0) {
-    logger_->log_debug("result of getCodecResourceName is %i", ret);
-    return false;
+  {
+    const auto ret = peer_->write(getCodecResourceName());
+    if (ret <= 0) {
+      logger_->log_debug("result of getCodecResourceName is %i", ret);
+      return false;
+    }
   }
 
-  ret = peer_->write(_currentCodecVersion);
-
-  if (ret <= 0) {
-    logger_->log_debug("result of _currentCodecVersion is %i", ret);
-    return false;
+  {
+    const auto ret = peer_->write(_currentCodecVersion);
+    if (ret <= 0) {
+      logger_->log_debug("result of _currentCodecVersion is %i", ret);
+      return false;
+    }
   }
 
   uint8_t statusCode;
-  ret = peer_->read(statusCode);
-
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = peer_->read(statusCode);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return false;
+    }
   }
-
   switch (statusCode) {
     case RESOURCE_OK:
       logger_->log_trace("Site2Site Codec Negotiate version OK");
       return true;
     case DIFFERENT_RESOURCE_VERSION:
       uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        return false;
+      {
+        const auto ret = peer_->read(serverVersion);
+        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+          return false;
+        }
       }
       logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a 
different protocol version " << serverVersion;
 
@@ -237,10 +246,11 @@ bool RawSiteToSiteClient::handShake() {
   logger_->log_debug("Site2Site Protocol Perform hand shake with destination 
port %s", port_id_.to_string());
   _commsIdentifier = id_generator_->generate();
 
-  int ret = peer_->write(_commsIdentifier);
-
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = peer_->write(_commsIdentifier);
+    if (ret <= 0) {
+      return false;
+    }
   }
 
   std::map<std::string, std::string> properties;
@@ -257,27 +267,33 @@ bool RawSiteToSiteClient::handShake() {
   }
 
   if (_currentVersion >= 3) {
-    ret = peer_->write(peer_->getURL());
+    const auto ret = peer_->write(peer_->getURL());
     if (ret <= 0) {
       return false;
     }
   }
 
-  uint32_t size = gsl::narrow<uint32_t>(properties.size());
-  ret = peer_->write(size);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto size = gsl::narrow<uint32_t>(properties.size());
+    const auto ret = peer_->write(size);
+    if (ret <= 0) {
+      return false;
+    }
   }
 
   std::map<std::string, std::string>::iterator it;
   for (it = properties.begin(); it != properties.end(); it++) {
-    ret = peer_->write(it->first);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = peer_->write(it->first);
+      if (ret <= 0) {
+        return false;
+      }
     }
-    ret = peer_->write(it->second);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = peer_->write(it->second);
+      if (ret <= 0) {
+        return false;
+      }
     }
     logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", 
it->first, it->second);
   }
@@ -285,10 +301,11 @@ bool RawSiteToSiteClient::handShake() {
   RespondCode code;
   std::string message;
 
-  ret = readRespond(nullptr, code, message);
-
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = readRespond(nullptr, code, message);
+    if (ret <= 0) {
+      return false;
+    }
   }
 
   std::string error;
@@ -310,13 +327,11 @@ bool RawSiteToSiteClient::handShake() {
     // Unknown error
     default:
       logger_->log_error("HandShake Failed because of unknown respond code 
%d", code);
-      ret = -1;
       return false;
   }
 
   // All known error cases handled here
   logger_->log_error("Site2Site HandShake Failed because destination port, %s, 
is %s", port_id_.to_string(), error);
-  ret = -1;
   return false;
 }
 
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp 
b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index fb24f73..b009c18 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -31,28 +31,21 @@ namespace sitetosite {
 
 int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& 
/*transaction*/, RespondCode &code, std::string &message) {
   uint8_t firstByte;
-
   auto ret = peer_->read(firstByte);
-
-  if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
+  if (ret == 0 || ret == static_cast<size_t>(-1) || firstByte != 
CODE_SEQUENCE_VALUE_1)
     return -1;
 
   uint8_t secondByte;
-
   ret = peer_->read(secondByte);
-
-  if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
+  if (ret == 0 || ret == static_cast<size_t>(-1) || secondByte != 
CODE_SEQUENCE_VALUE_2)
     return -1;
 
   uint8_t thirdByte;
-
   ret = peer_->read(thirdByte);
-
-  if (ret != static_cast<size_t>(-1))
+  if (ret == static_cast<size_t>(-1))
     return gsl::narrow_cast<int>(ret);
 
   code = (RespondCode) thirdByte;
-
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
   if (!resCode) {
     return -1;
@@ -69,7 +62,6 @@ void SiteToSiteClient::deleteTransaction(const 
utils::Identifier& transactionID)
   std::shared_ptr<Transaction> transaction;
 
   auto it = this->known_transactions_.find(transactionID);
-
   if (it == known_transactions_.end()) {
     return;
   } else {

Reply via email to