lordgamez commented on code in PR #1966:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1966#discussion_r2387246992


##########
libminifi/src/sitetosite/SiteToSiteClient.cpp:
##########
@@ -20,735 +20,740 @@
 #include <map>
 #include <string>
 #include <memory>
+#include <ranges>
 
 #include "utils/gsl.h"
 #include "utils/Enum.h"
 
 namespace org::apache::nifi::minifi::sitetosite {
 
-int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& 
/*transaction*/, RespondCode &code, std::string &message) {
-  uint8_t firstByte = 0;
-  {
-    const auto ret = peer_->read(firstByte);
-    if (ret == 0 || io::isError(ret) || firstByte != CODE_SEQUENCE_VALUE_1)
-      return -1;
+std::optional<SiteToSiteResponse> SiteToSiteClient::readResponse(const 
std::shared_ptr<Transaction>& /*transaction*/) {
+  uint8_t result_byte = 0;
+  if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret) 
|| result_byte != CODE_SEQUENCE_VALUE_1) {
+    logger_->log_error("Site2Site read response failed: invalid code sequence 
1 value");
+    return std::nullopt;
   }
 
-  uint8_t secondByte = 0;
-  {
-    const auto ret = peer_->read(secondByte);
-    if (ret == 0 || io::isError(ret) || secondByte != CODE_SEQUENCE_VALUE_2)
-      return -1;
+  if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret) 
|| result_byte != CODE_SEQUENCE_VALUE_2) {
+    logger_->log_error("Site2Site read response failed: invalid code sequence 
2 value");
+    return std::nullopt;
   }
 
-  uint8_t thirdByte = 0;
-  {
-    const auto ret = peer_->read(thirdByte);
-    if (ret == 0 || io::isError(ret))
-      return static_cast<int>(ret);
+  if (const auto ret = peer_->read(result_byte); ret == 0 || io::isError(ret)) 
{
+    logger_->log_error("Site2Site read response failed: failed to read 
response code");
+    return std::nullopt;
   }
 
-  code = static_cast<RespondCode>(thirdByte);
-  RespondCodeContext *resCode = this->getRespondCodeContext(code);
-  if (!resCode) {
-    return -1;
+  SiteToSiteResponse response;
+  if (auto code = magic_enum::enum_cast<ResponseCode>(result_byte)) {
+    response.code = *code;
+  } else {
+    logger_->log_error("Site2Site read response failed: invalid response 
code");
+    return std::nullopt;
   }
-  if (resCode->hasDescription) {
-    const auto ret = peer_->read(message);
-    if (ret == 0 || io::isError(ret))
-      return -1;
+
+  const ResponseCodeContext* response_code_context = 
getResponseCodeContext(response.code);
+  if (!response_code_context) {
+    logger_->log_error("Site2Site read response failed: invalid response code 
context");
+    return std::nullopt;
   }
-  return gsl::narrow<int>(3 + message.size());
+  if (response_code_context->has_description) {
+    if (const auto ret = peer_->read(response.message); ret == 0 || 
io::isError(ret)) {
+      logger_->log_error("Site2Site read response failed: failed to read 
response message");
+      return std::nullopt;
+    }
+  }
+  return response;
 }
 
-void SiteToSiteClient::deleteTransaction(const utils::Identifier& 
transactionID) {
+void SiteToSiteClient::handleTransactionError(const 
std::shared_ptr<Transaction>& transaction, core::ProcessContext& context, const 
std::exception& exception) {
+  if (transaction) {
+    deleteTransaction(transaction->getUUID());
+  }
+  context.yield();
+  tearDown();
+  logger_->log_warn("Caught Exception, type: {}, what: {}", 
typeid(exception).name(), exception.what());
+}
+
+void SiteToSiteClient::deleteTransaction(const utils::Identifier& 
transaction_id) {
   std::shared_ptr<Transaction> transaction;
 
-  auto it = this->known_transactions_.find(transactionID);
+  auto it = known_transactions_.find(transaction_id);
   if (it == known_transactions_.end()) {
+    logger_->log_warn("Site2Site transaction id '{}' not found for delete", 
transaction_id.to_string());
     return;
   } else {
     transaction = it->second;
   }
 
   logger_->log_debug("Site2Site delete transaction {}", 
transaction->getUUIDStr());
-  known_transactions_.erase(transactionID);
+  known_transactions_.erase(transaction_id);
 }
 
-int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& 
/*transaction*/, RespondCode code, const std::string& message) {
-  RespondCodeContext *resCode = this->getRespondCodeContext(code);
-  if (!resCode) {
-    return -1;
+bool SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& 
/*transaction*/, const SiteToSiteResponse& response) {
+  const ResponseCodeContext* response_code_context = 
getResponseCodeContext(response.code);
+  if (!response_code_context) {
+    return false;
   }
 
-  {
-    const std::array<uint8_t, 3> codeSeq = { CODE_SEQUENCE_VALUE_1, 
CODE_SEQUENCE_VALUE_2, static_cast<uint8_t>(code) };
-    const auto ret = peer_->write(codeSeq.data(), 3);
-    if (ret != 3)
-      return -1;
+  const std::array<uint8_t, 3> code_sequence = { CODE_SEQUENCE_VALUE_1, 
CODE_SEQUENCE_VALUE_2, magic_enum::enum_underlying(response.code) };
+  const auto ret = peer_->write(code_sequence.data(), 3);
+  if (ret != 3) {
+    return false;
   }
 
-  if (resCode->hasDescription) {
-    const auto ret = peer_->write(message);
-    if (io::isError(ret)) return -1;
-    if (ret == 0) return 0;
-    return 3 + gsl::narrow<int>(ret);
-  } else {
-    return 3;
+  if (response_code_context->has_description) {
+    return !(io::isError(peer_->write(response.message)));
   }
+  return true;
 }
 
 bool SiteToSiteClient::transferFlowFiles(core::ProcessContext& context, 
core::ProcessSession& session) {
   auto flow = session.get();
-
-  std::shared_ptr<Transaction> transaction = nullptr;
-
   if (!flow) {
     return false;
   }
 
-  if (peer_state_ != READY) {
-    if (!bootstrap())
+  if (peer_state_ != PeerState::READY) {
+    if (!bootstrap()) {
       return false;
+    }
   }
 
-  if (peer_state_ != READY) {
+  if (peer_state_ != PeerState::READY) {
     context.yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with 
peer");
   }
 
-  // Create the transaction
-  transaction = createTransaction(SEND);
+  auto transaction = createTransaction(TransferDirection::SEND);
   if (transaction == nullptr) {
     context.yield();
     tearDown();
     throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
   }
-  utils::Identifier transactionID = transaction->getUUID();
-
-  bool continueTransaction = true;
+  utils::Identifier transaction_id = transaction->getUUID();
   std::chrono::high_resolution_clock::time_point transaction_started_at = 
std::chrono::high_resolution_clock::now();
 
   try {
-    while (continueTransaction) {
+    while (true) {
       auto start_time = std::chrono::steady_clock::now();
-      std::string payload;
-      DataPacket packet(getLogger(), transaction, flow->getAttributes(), 
payload);
 
-      int16_t resp = send(transactionID, &packet, flow, &session);
-      if (resp == -1) {
+      if (!sendFlowFile(transaction, *flow, session)) {
         throw Exception(SITE2SITE_EXCEPTION, "Send Failed");
       }
 
-      logger_->log_debug("Site2Site transaction {} send flow record {}", 
transactionID.to_string(), flow->getUUIDStr());
-      if (resp == 0) {
-        auto end_time = std::chrono::steady_clock::now();
-        std::string transitUri = peer_->getURL() + "/" + flow->getUUIDStr();
-        std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote 
Host=" + peer_->getHostName();
-        session.getProvenanceReporter()->send(*flow, transitUri, details, 
std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time), 
false);
-      }
+      logger_->log_debug("Site2Site transaction {} send flow record {}", 
transaction_id.to_string(), flow->getUUIDStr());
+      auto end_time = std::chrono::steady_clock::now();
+      std::string transit_uri = peer_->getURL() + "/" + flow->getUUIDStr();
+      std::string details = "urn:nifi:" + flow->getUUIDStr() + "Remote Host=" 
+ peer_->getHostName();
+      session.getProvenanceReporter()->send(*flow, transit_uri, details, 
std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time), 
false);
       session.remove(flow);
 
       std::chrono::nanoseconds transfer_duration = 
std::chrono::high_resolution_clock::now() - transaction_started_at;
-      if (transfer_duration > _batchSendNanos)
+      if (transfer_duration > batch_send_nanos_) {
         break;
+      }
 
       flow = session.get();
-
       if (!flow) {
-        continueTransaction = false;
+        break;
       }
-    }  // while true
+    }
 
-    if (!confirm(transactionID)) {
-      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + 
transactionID.to_string());
+    if (!confirm(transaction_id)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed for " + 
transaction_id.to_string());
     }
-    if (!complete(context, transactionID)) {
-      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + 
transactionID.to_string());
+    if (!complete(context, transaction_id)) {
+      throw Exception(SITE2SITE_EXCEPTION, "Complete Failed for " + 
transaction_id.to_string());
     }
-    logger_->log_debug("Site2Site transaction {} successfully sent flow record 
{}, content bytes {}", transactionID.to_string(), 
transaction->total_transfers_, transaction->_bytes);
-  } catch (std::exception &exception) {
-    if (transaction)
-      deleteTransaction(transactionID);
-    context.yield();
-    tearDown();
-    logger_->log_debug("Caught Exception during 
SiteToSiteClient::transferFlowFiles, type: {}, what: {}", 
typeid(exception).name(), exception.what());
-    throw;
-  } catch (...) {
-    if (transaction)
-      deleteTransaction(transactionID);
-    context.yield();
-    tearDown();
-    logger_->log_debug("Caught Exception during 
SiteToSiteClient::transferFlowFiles, type: {}", getCurrentExceptionTypeName());
+    logger_->log_debug("Site2Site transaction {} successfully sent flow record 
{}, content bytes {}", transaction_id.to_string(), 
transaction->getCurrentTransfers(), transaction->getBytes());
+  } catch (const std::exception& exception) {
+    handleTransactionError(transaction, context, exception);

Review Comment:
   I think for the `deleteTransaction` it would be good to use the 
`gsl::finally`, but in case of failure that should be run before the `tearDown` 
so it cannot be used in all cases. Everything else in `handleTransactionError` 
should only be used in error cases so I would keep it this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to