This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 30fa5d1530fb07ce3fe5e026d04add890ae6d072
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Thu Feb 26 14:20:12 2026 +0100

    MINIFICPP-2726 Fix HTTP S2S no data and error handling
    
    - In case no data is sent we do not send cancel message to the remote
      peer, this is in line with NiFi-NiFi site to site communication
    - Handle errors in case HTTP client submit fails
    - Fix handling of 4xx and 5xx HTTP response codes: fail in case client
      side error code is received, only log error in case server side error
      is received
    - Log additional data for HTTP client communication
    
    Signed-off-by: Ferenc Gerlits <[email protected]>
    
    Closes #2116
---
 .../minifi_test_framework/steps/checking_steps.py  | 10 ++--
 core-framework/src/http/HTTPClient.cpp             |  5 +-
 docker/test/integration/features/s2s.feature       |  4 ++
 libminifi/src/sitetosite/HttpSiteToSiteClient.cpp  | 65 +++++++++++++++-------
 4 files changed, 57 insertions(+), 27 deletions(-)

diff --git a/behave_framework/src/minifi_test_framework/steps/checking_steps.py 
b/behave_framework/src/minifi_test_framework/steps/checking_steps.py
index c6593c1bf..2538953d7 100644
--- a/behave_framework/src/minifi_test_framework/steps/checking_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/checking_steps.py
@@ -23,7 +23,7 @@ import humanfriendly
 from behave import then, step
 
 from minifi_test_framework.containers.http_proxy_container import HttpProxy
-from minifi_test_framework.core.helpers import wait_for_condition, 
check_condition_after_wait
+from minifi_test_framework.core.helpers import wait_for_condition, 
check_condition_after_wait, log_due_to_failure
 from minifi_test_framework.core.minifi_test_context import 
DEFAULT_MINIFI_CONTAINER_NAME, MinifiTestContext
 
 
@@ -67,17 +67,17 @@ def step_impl(context: MinifiTestContext, content: str, 
directory: str, duration
 def step_impl(context: MinifiTestContext, message: str, duration: str):
     duration_seconds = humanfriendly.parse_timespan(duration)
     time.sleep(duration_seconds)
-    assert message not in 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs()
+    assert message not in 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or 
log_due_to_failure(context)
 
 
 @then("the Minifi logs do not contain errors")
 def step_impl(context: MinifiTestContext):
-    assert "[error]" not in 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].log_app_output()
+    assert "[error]" not in 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or 
log_due_to_failure(context)
 
 
 @then("the Minifi logs do not contain warnings")
 def step_impl(context: MinifiTestContext):
-    assert "[warning]" not in 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].log_app_output()
+    assert "[warning]" not in 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs() or 
log_due_to_failure(context)
 
 
 @then("the Minifi logs contain the following message: '{message}' in less than 
{duration}")
@@ -93,7 +93,7 @@ def step_impl(context: MinifiTestContext, message: str, 
duration: str):
 def step_impl(context, log_message, count, duration):
     duration_seconds = humanfriendly.parse_timespan(duration)
     time.sleep(duration_seconds)
-    assert 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs().count(log_message) 
== count or context.containers[DEFAULT_MINIFI_CONTAINER_NAME].log_app_output()
+    assert 
context.containers[DEFAULT_MINIFI_CONTAINER_NAME].get_logs().count(log_message) 
== count or log_due_to_failure(context)
 
 
 @then("the Minifi logs match the following regex: \"{regex}\" in less than 
{duration}")
diff --git a/core-framework/src/http/HTTPClient.cpp 
b/core-framework/src/http/HTTPClient.cpp
index 76e5d1116..fae2a78c9 100644
--- a/core-framework/src/http/HTTPClient.cpp
+++ b/core-framework/src/http/HTTPClient.cpp
@@ -357,7 +357,7 @@ bool HTTPClient::submit() {
   }
 
   curl_easy_setopt(http_session_.get(), CURLOPT_URL, url_.c_str());
-  logger_->log_debug("Submitting to {}", url_);
+  logger_->log_debug("Submitting to {} {}", method_ ? 
magic_enum::enum_name(*method_) : "NONE", url_);
   if (read_callback_ == nullptr) {
     curl_easy_setopt(http_session_.get(), CURLOPT_WRITEFUNCTION, 
&HTTPRequestResponse::receiveWrite);
     curl_easy_setopt(http_session_.get(), CURLOPT_WRITEDATA, 
static_cast<void*>(&content_));
@@ -381,12 +381,11 @@ bool HTTPClient::submit() {
     logger_->log_error("HTTP operation timed out, with absolute timeout {}\n", 
absolute_timeout);
   }
   if (res_ != CURLE_OK) {
-    logger_->log_info("{}", request_headers_.size());
     logger_->log_error("curl_easy_perform() failed {} on {}, error code {}\n", 
curl_easy_strerror(res_), url_, magic_enum::enum_underlying(res_));
     return false;
   }
 
-  logger_->log_debug("Finished with {}", url_);
+  logger_->log_debug("Finished with {} {}", method_ ? 
magic_enum::enum_name(*method_) : "NONE", url_);
   return true;
 }
 
diff --git a/docker/test/integration/features/s2s.feature 
b/docker/test/integration/features/s2s.feature
index 9b3321d4f..711a520e3 100644
--- a/docker/test/integration/features/s2s.feature
+++ b/docker/test/integration/features/s2s.feature
@@ -209,6 +209,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S 
protocol
 
     Then a flowfile with the content "test" is placed in the monitored 
directory in less than 90 seconds
     And the Minifi logs do not contain the following message: "ProcessSession 
rollback" after 1 seconds
+    And the Minifi logs do not contain the following message: "response code 
500" after 1 seconds
 
   Scenario: A NiFi instance produces and transfers data to a MiNiFi instance 
via s2s
     Given a file with the content "test" is present in "/tmp/input"
@@ -245,6 +246,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S 
protocol
 
     Then a flowfile with the content "test" is placed in the monitored 
directory in less than 90 seconds
     And the Minifi logs do not contain the following message: "ProcessSession 
rollback" after 1 seconds
+    And the Minifi logs do not contain the following message: "response code 
500" after 1 seconds
 
   Scenario: A NiFi instance produces and transfers data to a MiNiFi instance 
via s2s with SSL config defined in minifi.properties
     Given a file with the content "test" is present in "/tmp/input"
@@ -285,6 +287,7 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S 
protocol
 
     Then a flowfile with the content "test" is placed in the monitored 
directory in less than 90 seconds
     And the Minifi logs do not contain the following message: "ProcessSession 
rollback" after 1 seconds
+    And the Minifi logs do not contain the following message: "response code 
500" after 1 seconds
 
   Scenario: A MiNiFi instance produces and transfers data to a NiFi instance 
via s2s using compression
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
@@ -370,3 +373,4 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S 
protocol
 
     Then a flowfile with the content "test" is placed in the monitored 
directory in less than 90 seconds
     And the Minifi logs do not contain the following message: "ProcessSession 
rollback" after 1 seconds
+    And the Minifi logs do not contain the following message: "response code 
500" after 1 seconds
diff --git a/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp 
b/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp
index aacc70609..e2862afd9 100644
--- a/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp
@@ -102,7 +102,10 @@ std::shared_ptr<Transaction> 
HttpSiteToSiteClient::createTransaction(TransferDir
   client->setRequestHeader("Accept", "application/json");
   client->setRequestHeader("Transfer-Encoding", "chunked");
   client->setPostFields("");
-  client->submit();
+  if (!client->submit()) {
+    logger_->log_warn("Failed to submit create transaction request for 
transaction {}", uri.str());
+    return nullptr;
+  }
 
   if (auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream())) {
     logger_->log_debug("Closing {}", http_stream->getClientRef()->getURL());
@@ -110,7 +113,7 @@ std::shared_ptr<Transaction> 
HttpSiteToSiteClient::createTransaction(TransferDir
 
   if (client->getResponseCode() != 201) {
     peer_->setStream(nullptr);
-    logger_->log_debug("Could not create transaction, received {}", 
client->getResponseCode());
+    logger_->log_debug("Could not create transaction, received response code 
{}", client->getResponseCode());
     return nullptr;
   }
   // parse the headers
@@ -242,7 +245,10 @@ std::optional<std::vector<PeerStatus>> 
HttpSiteToSiteClient::getPeerList() {
 
   setSiteToSiteHeaders(*client);
 
-  client->submit();
+  if (!client->submit()) {
+    logger_->log_warn("Failed to submit get peer list request {}", uri.str());
+    return std::nullopt;
+  }
 
   if (client->getResponseCode() == 200) {
     return parsePeerStatuses(logger_, 
std::string(client->getResponseBody().data(), 
client->getResponseBody().size()), port_id_);
@@ -314,6 +320,11 @@ void HttpSiteToSiteClient::closeTransaction(const 
utils::Identifier &transaction
     return;
   }
 
+  const auto guard = gsl::finally([&transaction]() {
+    transaction->close();
+    transaction->decrementCurrentTransfers();
+  });
+
   logger_->log_trace("Site to Site closing transaction {}", 
transaction->getUUIDStr());
 
   bool data_received = transaction->getDirection() == 
TransferDirection::RECEIVE && (current_code_ == 
ResponseCode::CONFIRM_TRANSACTION || current_code_ == 
ResponseCode::TRANSACTION_FINISHED);
@@ -326,7 +337,9 @@ void HttpSiteToSiteClient::closeTransaction(const 
utils::Identifier &transaction
   if (transaction->getState() == TransactionState::TRANSACTION_CONFIRMED || 
data_received) {
     code = ResponseCode::CONFIRM_TRANSACTION;
   } else if (transaction->getCurrentTransfers() == 0 && 
!transaction->isDataAvailable()) {
-    code = ResponseCode::CANCEL_TRANSACTION;
+    // If there was no data to send, the transaction is removed on server 
side, no need to send delete request.
+    logger_->log_debug("Transaction {} canceled with no transfers, skipping 
DELETE to server", transaction->getUUIDStr());
+    return;
   } else {
     std::string directon = transaction->getDirection() == 
TransferDirection::RECEIVE ? "Receive" : "Send";
     logger_->log_error("Transaction {} to be closed is in unexpected state. 
Direction: {}, transfers: {}, bytes: {}, state: {}",
@@ -347,19 +360,21 @@ void HttpSiteToSiteClient::closeTransaction(const 
utils::Identifier &transaction
   setSiteToSiteHeaders(*client);
   client->setConnectionTimeout(std::chrono::milliseconds(5000));
   client->setRequestHeader("Accept", "application/json");
-  client->submit();
-
-  logger_->log_debug("Received {} response code from delete", 
client->getResponseCode());
-
-  if (client->getResponseCode() >= 400) {
-    std::string error(client->getResponseBody().data(), 
client->getResponseBody().size());
 
-    logger_->log_warn("{} received: {}", client->getResponseCode(), error);
-    throw Exception(SITE2SITE_EXCEPTION, fmt::format("Received {} from {}", 
client->getResponseCode(), uri.str()));
+  if (!client->submit()) {
+    logger_->log_warn("Failed to submit delete transaction request for 
transaction {}", transaction_id.to_string());
+  } else {
+    if (client->getResponseCode() >= 400) {
+      const std::string error(client->getResponseBody().data(), 
client->getResponseBody().size());
+      const auto message = fmt::format("Received response code {} while 
deleting transaction {}: {}", client->getResponseCode(), 
transaction_id.to_string(), error);
+      if (client->getResponseCode() < 500) {
+        logger_->log_error(fmt::runtime(message));
+        throw Exception(SITE2SITE_EXCEPTION, message);
+      } else {
+        logger_->log_warn(fmt::runtime(message));
+      }
+    }
   }
-
-  transaction->close();
-  transaction->decrementCurrentTransfers();
 }
 
 void HttpSiteToSiteClient::deleteTransaction(const utils::Identifier& 
transaction_id) {
@@ -395,6 +410,8 @@ std::pair<uint64_t, uint64_t> 
HttpSiteToSiteClient::readFlowFiles(const std::sha
   try {
     read_result = SiteToSiteClient::readFlowFiles(transaction, session);
   } catch (const Exception&) {
+    // Wait for the HTTP response to fully complete before checking the 
response code
+    http_stream->getClient();
     auto response_code = http_stream->getClientRef()->getResponseCode();
 
     // 200 tells us that there is no content to read, so we should not treat 
it as an error.
@@ -407,11 +424,21 @@ std::pair<uint64_t, uint64_t> 
HttpSiteToSiteClient::readFlowFiles(const std::sha
       current_code_ = ResponseCode::CANCEL_TRANSACTION;
       return {0, 0};
     }
-    throw;
-  }
 
-  if (auto response_code = http_stream->getClientRef()->getResponseCode(); 
response_code >= 400) {
-    throw Exception(SITE2SITE_EXCEPTION, fmt::format("HTTP error code received 
while reading flow files: {}", response_code));
+    if (response_code >= 400) {
+      const std::string error = 
std::string(http_stream->getClientRef()->getResponseBody().data(), 
http_stream->getClientRef()->getResponseBody().size());
+      const auto message = fmt::format("Received response code {} while 
reading flow files for transaction {}: {}", response_code, 
transaction->getUUIDStr(), error);
+      if (response_code < 500) {
+        logger_->log_error(fmt::runtime(message));
+        throw Exception(SITE2SITE_EXCEPTION, message);
+      } else {
+        logger_->log_warn(fmt::runtime(message));
+        transaction->setState(TransactionState::TRANSACTION_ERROR);
+        current_code_ = ResponseCode::CANCEL_TRANSACTION;
+        return {0, 0};
+      }
+    }
+    throw;
   }
   return read_result;
 }

Reply via email to