This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 9187c2efd5c26c7c7b654d9346de0959dc53610a
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Thu Feb 12 17:13:12 2026 +0100

    MINIFICPP-2709 Fix site to site receive using HTTP protocol
    
    Closes #2094
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 core-framework/include/http/HTTPStream.h           | 69 +++++++++-------------
 core-framework/src/http/HTTPStream.cpp             | 34 +++++------
 .../Minifi_flow_json_serializer.py                 |  1 +
 .../Minifi_flow_yaml_serializer.py                 |  1 +
 .../include/sitetosite/HttpSiteToSiteClient.h      |  3 +
 libminifi/include/sitetosite/SiteToSiteClient.h    |  7 ++-
 libminifi/src/sitetosite/HttpSiteToSiteClient.cpp  | 40 +++++++++++--
 libminifi/src/sitetosite/SiteToSiteClient.cpp      | 40 +++++--------
 8 files changed, 103 insertions(+), 92 deletions(-)

diff --git a/core-framework/include/http/HTTPStream.h 
b/core-framework/include/http/HTTPStream.h
index 7a513bd31..cd3e08da2 100644
--- a/core-framework/include/http/HTTPStream.h
+++ b/core-framework/include/http/HTTPStream.h
@@ -48,54 +48,42 @@ class HttpStream : public io::BaseStreamImpl {
   }
 
   const std::shared_ptr<HTTPClient>& getClient() {
-    http_client_future_.get();
+    if (http_client_read_future_.valid()) {
+      http_client_read_future_.get();
+    }
+    if (http_client_write_future_.valid()) {
+      http_client_write_future_.get();
+    }
     return http_client_;
   }
 
   void forceClose() {
-    if (started_) {
-      // lock shouldn't be needed here as call paths currently guarantee
-      // flow, but we should be safe anyway.
-      std::lock_guard<std::mutex> lock(mutex_);
+    std::lock_guard<std::mutex> lock(mutex_);
+    if (read_started_ || write_started_) {
       close();
       http_client_->forceClose();
-      if (http_client_future_.valid()) {
-        http_client_future_.get();
-      } else {
-        logger_->log_warn("Future status already cleared for {}, continuing", 
http_client_->getURL());
-      }
+      read_started_ = false;
+      write_started_ = false;
+    }
 
-      started_ = false;
+    if (http_client_read_future_.valid()) {
+      http_client_read_future_.get();
+    }
+    if (http_client_write_future_.valid()) {
+      http_client_write_future_.get();
     }
   }
 
-  /**
-   * Skip to the specified offset.
-   * @param offset offset to which we will skip
-   */
   void seek(size_t offset) override;
 
   [[nodiscard]] size_t tell() const override;
 
-  [[nodiscard]] size_t size() const override {
-    return written;
-  }
+  [[nodiscard]] size_t size() const override;
 
   using BaseStream::write;
   using BaseStream::read;
 
-  /**
-   * Reads data and places it into buf
-   * @param buf buffer in which we extract data
-   * @param buflen
-   */
   size_t read(std::span<std::byte> buf) override;
-
-  /**
-   * writes value to stream
-   * @param value value to write
-   * @param size size of value
-   */
   size_t write(const uint8_t* value, size_t size) override;
 
   static bool submit_client(const std::shared_ptr<HTTPClient>& client) {
@@ -114,19 +102,22 @@ class HttpStream : public io::BaseStreamImpl {
   }
 
   inline bool isFinished(int seconds = 0) {
-    return http_client_future_.wait_for(std::chrono::seconds(seconds)) == 
std::future_status::ready
+    if (!http_client_read_future_.valid()) {
+      return false;
+    }
+    return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) == 
std::future_status::ready
         && getByteOutputReadCallback()
         && getByteOutputReadCallback()->getSize() == 0
         && getByteOutputReadCallback()->waitingOps();
   }
 
-  /**
-   * Waits for more data to become available.
-   */
   bool waitForDataAvailable() {
+    if (!http_client_read_future_.valid()) {
+      return false;
+    }
     do {
       logger_->log_trace("Waiting for more data");
-    } while (http_client_future_.wait_for(std::chrono::seconds(0)) != 
std::future_status::ready
+    } while (http_client_read_future_.wait_for(std::chrono::seconds(0)) != 
std::future_status::ready
         && getByteOutputReadCallback()
         && getByteOutputReadCallback()->getSize() == 0);
 
@@ -135,16 +126,14 @@ class HttpStream : public io::BaseStreamImpl {
   }
 
  protected:
-  std::vector<uint8_t> array;
-
   std::shared_ptr<HTTPClient> http_client_;
-  std::future<bool> http_client_future_;
-
-  size_t written{0};
+  std::future<bool> http_client_read_future_;
+  std::future<bool> http_client_write_future_;
 
   std::mutex mutex_;
 
-  std::atomic<bool> started_{false};
+  std::atomic<bool> read_started_{false};
+  std::atomic<bool> write_started_{false};
 
  private:
   utils::ByteOutputCallback* getByteOutputReadCallback() {
diff --git a/core-framework/src/http/HTTPStream.cpp 
b/core-framework/src/http/HTTPStream.cpp
index 61f076e45..69a0bbb7e 100644
--- a/core-framework/src/http/HTTPStream.cpp
+++ b/core-framework/src/http/HTTPStream.cpp
@@ -40,30 +40,28 @@ void HttpStream::close() {
 }
 
 void HttpStream::seek(size_t /*offset*/) {
-  // seek is an unnecessary part of this implementation
   throw std::logic_error{"HttpStream::seek is unimplemented"};
 }
 
 size_t HttpStream::tell() const {
-  // tell is an unnecessary part of this implementation
   throw std::logic_error{"HttpStream::tell is unimplemented"};
 }
 
-// data stream overrides
+[[nodiscard]] size_t HttpStream::size() const {
+  throw std::logic_error{"HttpStream::size is unimplemented"};
+}
 
 size_t HttpStream::write(const uint8_t* value, size_t size) {
   if (size == 0) return 0;
   if (IsNullOrEmpty(value)) {
     return io::STREAM_ERROR;
   }
-  if (!started_) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    if (!started_) {
-      auto callback = std::make_unique<HttpStreamingCallback>();
-      http_client_->setUploadCallback(std::move(callback));
-      http_client_future_ = std::async(std::launch::async, submit_client, 
http_client_);
-      started_ = true;
-    }
+  std::lock_guard<std::mutex> lock(mutex_);
+  if (!write_started_) {
+    auto callback = std::make_unique<HttpStreamingCallback>();
+    http_client_->setUploadCallback(std::move(callback));
+    http_client_write_future_ = std::async(std::launch::async, submit_client, 
http_client_);
+    write_started_ = true;
   }
   if (auto http_callback = 
dynamic_cast<HttpStreamingCallback*>(http_client_->getUploadCallback()))
     http_callback->process(value, size);
@@ -75,14 +73,12 @@ size_t HttpStream::write(const uint8_t* value, size_t size) 
{
 size_t HttpStream::read(std::span<std::byte> buf) {
   if (buf.empty()) { return 0; }
   if (!IsNullOrEmpty(buf)) {
-    if (!started_) {
-      std::lock_guard<std::mutex> lock(mutex_);
-      if (!started_) {
-        auto read_callback = 
std::make_unique<HTTPReadByteOutputCallback>(66560, true);
-        http_client_future_ = std::async(std::launch::async, 
submit_read_client, http_client_, read_callback.get());
-        http_client_->setReadCallback(std::move(read_callback));
-        started_ = true;
-      }
+    std::lock_guard<std::mutex> lock(mutex_);
+    if (!read_started_) {
+      auto read_callback = std::make_unique<HTTPReadByteOutputCallback>(66560, 
true);
+      http_client_read_future_ = std::async(std::launch::async, 
submit_read_client, http_client_, read_callback.get());
+      http_client_->setReadCallback(std::move(read_callback));
+      read_started_ = true;
     }
     return 
gsl::not_null(getByteOutputReadCallback())->readFully(reinterpret_cast<char*>(buf.data()),
 buf.size());
   } else {
diff --git 
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
 
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
index 4a64a982e..7082d7820 100644
--- 
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
+++ 
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py
@@ -133,6 +133,7 @@ class Minifi_flow_json_serializer:
                         'targetUri': group.url,
                         'communicationsTimeout': '30 sec',
                         'yieldDuration': '3 sec',
+                        'transportProtocol': group.transport_protocol,
                         'outputPorts': []
                     }
 
diff --git 
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
 
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
index 4c951a051..c3e733d26 100644
--- 
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
+++ 
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
@@ -136,6 +136,7 @@ class Minifi_flow_yaml_serializer:
                         'url': group.url,
                         'timeout': '30 sec',
                         'yield period': '3 sec',
+                        'transport protocol': group.transport_protocol,
                         'Output Ports': []
                     }
 
diff --git a/libminifi/include/sitetosite/HttpSiteToSiteClient.h 
b/libminifi/include/sitetosite/HttpSiteToSiteClient.h
index 1d8446ae5..a235b6b9d 100644
--- a/libminifi/include/sitetosite/HttpSiteToSiteClient.h
+++ b/libminifi/include/sitetosite/HttpSiteToSiteClient.h
@@ -77,6 +77,9 @@ class HttpSiteToSiteClient final : public SiteToSiteClient {
   void deleteTransaction(const utils::Identifier& transaction_id) override;
   void tearDown() override;
 
+ protected:
+  std::pair<uint64_t, uint64_t> readFlowFiles(const 
std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) 
override;
+
  private:
   void setSiteToSiteHeaders(minifi::http::HTTPClient& client);
   void closeTransaction(const utils::Identifier &transaction_id);
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h 
b/libminifi/include/sitetosite/SiteToSiteClient.h
index ab4987943..656f71e65 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -25,6 +25,7 @@
 #include <utility>
 #include <vector>
 #include <optional>
+#include <expected>
 
 #include "Peer.h"
 #include "SiteToSite.h"
@@ -135,6 +136,7 @@ class SiteToSiteClient {
   virtual void deleteTransaction(const utils::Identifier &transaction_id);
   virtual std::optional<SiteToSiteResponse> readResponse(const 
std::shared_ptr<Transaction> &transaction);
   virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction, 
const SiteToSiteResponse& response);
+  virtual std::pair<uint64_t, uint64_t> readFlowFiles(const 
std::shared_ptr<Transaction>& transaction, core::ProcessSession& session);
 
   bool initializeSend(const std::shared_ptr<Transaction>& transaction);
   bool writeAttributesInSendTransaction(io::OutputStream& stream, const 
std::string& transaction_id_str, const std::map<std::string, std::string>& 
attributes);
@@ -187,9 +189,8 @@ class SiteToSiteClient {
   bool completeReceive(const std::shared_ptr<Transaction>& transaction, const 
utils::Identifier& transaction_id);
   bool completeSend(const std::shared_ptr<Transaction>& transaction, const 
utils::Identifier& transaction_id, core::ProcessContext& context);
 
-  bool readFlowFileHeaderData(io::InputStream& stream, const std::string& 
transaction_id, SiteToSiteClient::ReceiveFlowFileHeaderResult& result);
-  std::optional<ReceiveFlowFileHeaderResult> 
receiveFlowFileHeader(io::InputStream& stream, const 
std::shared_ptr<Transaction>& transaction);
-  std::pair<uint64_t, uint64_t> readFlowFiles(const 
std::shared_ptr<Transaction>& transaction, core::ProcessSession& session);
+  std::expected<void, std::string> readFlowFileHeaderData(io::InputStream& 
stream, const std::string& transaction_id, 
SiteToSiteClient::ReceiveFlowFileHeaderResult& result);
+  std::expected<ReceiveFlowFileHeaderResult, std::string> 
receiveFlowFileHeader(io::InputStream& stream, const 
std::shared_ptr<Transaction>& transaction);
 
   std::shared_ptr<core::logging::Logger> 
logger_{core::logging::LoggerFactory<SiteToSiteClient>::getLogger()};
 };
diff --git a/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp 
b/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp
index 29ea227b4..aacc70609 100644
--- a/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp
@@ -141,7 +141,6 @@ std::shared_ptr<Transaction> 
HttpSiteToSiteClient::createTransaction(TransferDir
   } else {
     transaction_client = openConnectionForReceive(transaction);
     transaction->setDataAvailable(true);
-    // 201 tells us that data is available. 200 would mean that nothing is 
available.
   }
   gsl_Assert(transaction_client);
 
@@ -352,13 +351,11 @@ void HttpSiteToSiteClient::closeTransaction(const 
utils::Identifier &transaction
 
   logger_->log_debug("Received {} response code from delete", 
client->getResponseCode());
 
-  if (client->getResponseCode() == 400) {
+  if (client->getResponseCode() >= 400) {
     std::string error(client->getResponseBody().data(), 
client->getResponseBody().size());
 
-    logger_->log_warn("400 received: {}", error);
-    std::stringstream message;
-    message << "Received " << client->getResponseCode() << " from " << 
uri.str();
-    throw Exception(SITE2SITE_EXCEPTION, message.str());
+    logger_->log_warn("{} received: {}", client->getResponseCode(), error);
+    throw Exception(SITE2SITE_EXCEPTION, fmt::format("Received {} from {}", 
client->getResponseCode(), uri.str()));
   }
 
   transaction->close();
@@ -388,4 +385,35 @@ void 
HttpSiteToSiteClient::setSiteToSiteHeaders(minifi::http::HTTPClient& client
   }
 }
 
+std::pair<uint64_t, uint64_t> HttpSiteToSiteClient::readFlowFiles(const 
std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) {
+  auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
+  if (!http_stream) {
+    throw Exception(SITE2SITE_EXCEPTION, "Reading flow files failed: stream 
cannot be cast to HTTP stream");
+  }
+
+  std::pair<uint64_t, uint64_t> read_result;
+  try {
+    read_result = SiteToSiteClient::readFlowFiles(transaction, session);
+  } catch (const Exception&) {
+    auto response_code = http_stream->getClientRef()->getResponseCode();
+
+    // 200 tells us that there is no content to read, so we should not treat 
it as an error.
+    // The read fails in this case because the stream does not contain a valid 
response body with the expected format.
+    // Unfortunately there is no way to get the response code before trying to 
read, so we have to let it fail and check the response code afterwards.
+    if (response_code == 200) {
+      logger_->log_debug("Response code 200, no content to read");
+      transaction->setDataAvailable(false);
+      transaction->setState(TransactionState::TRANSACTION_CANCELED);
+      current_code_ = ResponseCode::CANCEL_TRANSACTION;
+      return {0, 0};
+    }
+    throw;
+  }
+
+  if (auto response_code = http_stream->getClientRef()->getResponseCode(); 
response_code >= 400) {
+    throw Exception(SITE2SITE_EXCEPTION, fmt::format("HTTP error code received 
while reading flow files: {}", response_code));
+  }
+  return read_result;
+}
+
 }  // namespace org::apache::nifi::minifi::sitetosite
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp 
b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index e34c94858..290097b40 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -559,11 +559,10 @@ bool SiteToSiteClient::sendPacket(const DataPacket& 
packet) {
   return true;
 }
 
-bool SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const 
std::string& transaction_id_str, SiteToSiteClient::ReceiveFlowFileHeaderResult& 
result) {
+std::expected<void, std::string> 
SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const 
std::string& transaction_id_str, SiteToSiteClient::ReceiveFlowFileHeaderResult& 
result) {
   uint32_t num_attributes = 0;
   if (const auto ret = stream.read(num_attributes); ret == 0 || 
io::isError(ret) || num_attributes > MAX_NUM_ATTRIBUTES) {
-    logger_->log_error("Site2Site failed to read number of attributes with 
return code {}, or number of attributes is invalid: {}", ret, num_attributes);
-    return false;
+    return std::unexpected(fmt::format("Site2Site failed to read number of 
attributes with return code {}, or number of attributes is invalid: {}", ret, 
num_attributes));
   }
 
   logger_->log_debug("Site2Site transaction {} receives {} attributes", 
transaction_id_str, num_attributes);
@@ -571,13 +570,11 @@ bool 
SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std
     std::string key;
     std::string value;
     if (const auto ret = stream.read(key, true); ret == 0 || io::isError(ret)) 
{
-      logger_->log_error("Site2Site transaction {} failed to read attribute 
key", transaction_id_str);
-      return false;
+      return std::unexpected(fmt::format("Site2Site transaction {} failed to 
read attribute key", transaction_id_str));
     }
 
     if (const auto ret = stream.read(value, true); ret == 0 || 
io::isError(ret)) {
-      logger_->log_error("Site2Site transaction {} failed to read attribute 
value for key {}", transaction_id_str, key);
-      return false;
+      return std::unexpected(fmt::format("Site2Site transaction {} failed to 
read attribute value for key {}", transaction_id_str, key));
     }
 
     result.attributes[key] = value;
@@ -586,32 +583,29 @@ bool 
SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std
 
   uint64_t len = 0;
   if (const auto ret = stream.read(len); ret == 0 || io::isError(ret)) {
-    logger_->log_error("Site2Site transaction {} failed to read flow file data 
size", transaction_id_str);
-    return false;
+    return std::unexpected(fmt::format("Site2Site transaction {} failed to 
read flow file data size", transaction_id_str));
   }
 
   result.flow_file_data_size = len;
-  return true;
+  return {};
 }
 
-std::optional<SiteToSiteClient::ReceiveFlowFileHeaderResult> 
SiteToSiteClient::receiveFlowFileHeader(io::InputStream& stream, const 
std::shared_ptr<Transaction>& transaction) {
+std::expected<SiteToSiteClient::ReceiveFlowFileHeaderResult, std::string> 
SiteToSiteClient::receiveFlowFileHeader(io::InputStream& stream, const 
std::shared_ptr<Transaction>& transaction) {
   if (peer_state_ != PeerState::READY) {
     bootstrap();
   }
 
   if (peer_state_ != PeerState::READY) {
-    return std::nullopt;
+    return std::unexpected("Peer state is not ready");
   }
 
   const auto transaction_id_str = transaction->getUUIDStr();
   if (transaction->getState() != TransactionState::TRANSACTION_STARTED && 
transaction->getState() != TransactionState::DATA_EXCHANGED) {
-    logger_->log_warn("Site2Site transaction {} is not at started or exchanged 
state", transaction_id_str);
-    return std::nullopt;
+    return std::unexpected(fmt::format("Site2Site transaction {} is not at 
started or exchanged state", transaction_id_str));
   }
 
   if (transaction->getDirection() != TransferDirection::RECEIVE) {
-    logger_->log_warn("Site2Site transaction {} direction is wrong", 
transaction_id_str);
-    return std::nullopt;
+    return std::unexpected(fmt::format("Site2Site transaction {} direction is 
wrong", transaction_id_str));
   }
 
   ReceiveFlowFileHeaderResult result;
@@ -624,7 +618,7 @@ 
std::optional<SiteToSiteClient::ReceiveFlowFileHeaderResult> SiteToSiteClient::r
     // if we already have transferred a flow file before, check to see whether 
another one is available
     auto response = readResponse(transaction);
     if (!response) {
-      return std::nullopt;
+      return std::unexpected("Failed to read response");
     }
     if (response->code == ResponseCode::CONTINUE_TRANSACTION) {
       logger_->log_debug("Site2Site transaction {} peer indicate continue 
transaction", transaction_id_str);
@@ -635,8 +629,7 @@ 
std::optional<SiteToSiteClient::ReceiveFlowFileHeaderResult> SiteToSiteClient::r
       result.eof = true;
       return result;
     } else {
-      logger_->log_debug("Site2Site transaction {} peer indicate wrong 
response code {}", transaction_id_str, 
magic_enum::enum_underlying(response->code));
-      return std::nullopt;
+      return std::unexpected(fmt::format("Site2Site transaction {} peer 
indicate wrong response code {}", transaction_id_str, 
magic_enum::enum_underlying(response->code)));
     }
   }
 
@@ -646,9 +639,8 @@ 
std::optional<SiteToSiteClient::ReceiveFlowFileHeaderResult> SiteToSiteClient::r
     return result;
   }
 
-  if (!readFlowFileHeaderData(stream, transaction_id_str, result)) {
-    logger_->log_error("Site2Site transaction {} failed to read flow file 
header data", transaction_id_str);
-    return std::nullopt;
+  if (auto ret = readFlowFileHeaderData(stream, transaction_id_str, result); 
!ret.has_value()) {
+    return std::unexpected(fmt::format("Site2Site transaction {} failed to 
read flow file header data: {}", transaction_id_str, ret.error()));
   }
 
   if (result.flow_file_data_size > 0 || !result.attributes.empty()) {
@@ -679,14 +671,14 @@ std::pair<uint64_t, uint64_t> 
SiteToSiteClient::readFlowFiles(const std::shared_
     compression_stream = 
std::make_unique<CompressionInputStream>(transaction->getStream());
     compression_wrapper_crc_stream = 
std::make_unique<io::CRCStream<io::InputStream>>(gsl::make_not_null(compression_stream.get()));
   }
-  io::InputStream& stream = use_compression_ ?  
static_cast<io::InputStream&>(*compression_wrapper_crc_stream) : 
static_cast<io::InputStream&>(transaction->getStream());
+  io::InputStream& stream = use_compression_ ?  
static_cast<io::InputStream&>(*compression_wrapper_crc_stream) : 
transaction->getStream();
 
   while (true) {
     auto start_time = std::chrono::steady_clock::now();
 
     auto receive_header_result = receiveFlowFileHeader(stream, transaction);
     if (!receive_header_result) {
-      throw Exception(SITE2SITE_EXCEPTION, "Receive Failed " + 
transaction->getUUIDStr());
+      throw Exception(SITE2SITE_EXCEPTION, fmt::format("Receive Failed for {}: 
{}", transaction->getUUIDStr(), receive_header_result.error()));
     }
 
     if (receive_header_result->eof) {

Reply via email to