This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9187c2efd5c26c7c7b654d9346de0959dc53610a Author: Gabor Gyimesi <[email protected]> AuthorDate: Thu Feb 12 17:13:12 2026 +0100 MINIFICPP-2709 Fix site to site receive using HTTP protocol Closes #2094 Signed-off-by: Marton Szasz <[email protected]> --- core-framework/include/http/HTTPStream.h | 69 +++++++++------------- core-framework/src/http/HTTPStream.cpp | 34 +++++------ .../Minifi_flow_json_serializer.py | 1 + .../Minifi_flow_yaml_serializer.py | 1 + .../include/sitetosite/HttpSiteToSiteClient.h | 3 + libminifi/include/sitetosite/SiteToSiteClient.h | 7 ++- libminifi/src/sitetosite/HttpSiteToSiteClient.cpp | 40 +++++++++++-- libminifi/src/sitetosite/SiteToSiteClient.cpp | 40 +++++-------- 8 files changed, 103 insertions(+), 92 deletions(-) diff --git a/core-framework/include/http/HTTPStream.h b/core-framework/include/http/HTTPStream.h index 7a513bd31..cd3e08da2 100644 --- a/core-framework/include/http/HTTPStream.h +++ b/core-framework/include/http/HTTPStream.h @@ -48,54 +48,42 @@ class HttpStream : public io::BaseStreamImpl { } const std::shared_ptr<HTTPClient>& getClient() { - http_client_future_.get(); + if (http_client_read_future_.valid()) { + http_client_read_future_.get(); + } + if (http_client_write_future_.valid()) { + http_client_write_future_.get(); + } return http_client_; } void forceClose() { - if (started_) { - // lock shouldn't be needed here as call paths currently guarantee - // flow, but we should be safe anyway. - std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); + if (read_started_ || write_started_) { close(); http_client_->forceClose(); - if (http_client_future_.valid()) { - http_client_future_.get(); - } else { - logger_->log_warn("Future status already cleared for {}, continuing", http_client_->getURL()); - } + read_started_ = false; + write_started_ = false; + } - started_ = false; + if (http_client_read_future_.valid()) { + http_client_read_future_.get(); + } + if (http_client_write_future_.valid()) { + http_client_write_future_.get(); } } - /** - * Skip to the specified offset. - * @param offset offset to which we will skip - */ void seek(size_t offset) override; [[nodiscard]] size_t tell() const override; - [[nodiscard]] size_t size() const override { - return written; - } + [[nodiscard]] size_t size() const override; using BaseStream::write; using BaseStream::read; - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ size_t read(std::span<std::byte> buf) override; - - /** - * writes value to stream - * @param value value to write - * @param size size of value - */ size_t write(const uint8_t* value, size_t size) override; static bool submit_client(const std::shared_ptr<HTTPClient>& client) { @@ -114,19 +102,22 @@ class HttpStream : public io::BaseStreamImpl { } inline bool isFinished(int seconds = 0) { - return http_client_future_.wait_for(std::chrono::seconds(seconds)) == std::future_status::ready + if (!http_client_read_future_.valid()) { + return false; + } + return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) == std::future_status::ready && getByteOutputReadCallback() && getByteOutputReadCallback()->getSize() == 0 && getByteOutputReadCallback()->waitingOps(); } - /** - * Waits for more data to become available. - */ bool waitForDataAvailable() { + if (!http_client_read_future_.valid()) { + return false; + } do { logger_->log_trace("Waiting for more data"); - } while (http_client_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready + } while (http_client_read_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready && getByteOutputReadCallback() && getByteOutputReadCallback()->getSize() == 0); @@ -135,16 +126,14 @@ class HttpStream : public io::BaseStreamImpl { } protected: - std::vector<uint8_t> array; - std::shared_ptr<HTTPClient> http_client_; - std::future<bool> http_client_future_; - - size_t written{0}; + std::future<bool> http_client_read_future_; + std::future<bool> http_client_write_future_; std::mutex mutex_; - std::atomic<bool> started_{false}; + std::atomic<bool> read_started_{false}; + std::atomic<bool> write_started_{false}; private: utils::ByteOutputCallback* getByteOutputReadCallback() { diff --git a/core-framework/src/http/HTTPStream.cpp b/core-framework/src/http/HTTPStream.cpp index 61f076e45..69a0bbb7e 100644 --- a/core-framework/src/http/HTTPStream.cpp +++ b/core-framework/src/http/HTTPStream.cpp @@ -40,30 +40,28 @@ void HttpStream::close() { } void HttpStream::seek(size_t /*offset*/) { - // seek is an unnecessary part of this implementation throw std::logic_error{"HttpStream::seek is unimplemented"}; } size_t HttpStream::tell() const { - // tell is an unnecessary part of this implementation throw std::logic_error{"HttpStream::tell is unimplemented"}; } -// data stream overrides +[[nodiscard]] size_t HttpStream::size() const { + throw std::logic_error{"HttpStream::size is unimplemented"}; +} size_t HttpStream::write(const uint8_t* value, size_t size) { if (size == 0) return 0; if (IsNullOrEmpty(value)) { return io::STREAM_ERROR; } - if (!started_) { - std::lock_guard<std::mutex> lock(mutex_); - if (!started_) { - auto callback = std::make_unique<HttpStreamingCallback>(); - http_client_->setUploadCallback(std::move(callback)); - http_client_future_ = std::async(std::launch::async, submit_client, http_client_); - started_ = true; - } + std::lock_guard<std::mutex> lock(mutex_); + if (!write_started_) { + auto callback = std::make_unique<HttpStreamingCallback>(); + http_client_->setUploadCallback(std::move(callback)); + http_client_write_future_ = std::async(std::launch::async, submit_client, http_client_); + write_started_ = true; } if (auto http_callback = dynamic_cast<HttpStreamingCallback*>(http_client_->getUploadCallback())) http_callback->process(value, size); @@ -75,14 +73,12 @@ size_t HttpStream::write(const uint8_t* value, size_t size) { size_t HttpStream::read(std::span<std::byte> buf) { if (buf.empty()) { return 0; } if (!IsNullOrEmpty(buf)) { - if (!started_) { - std::lock_guard<std::mutex> lock(mutex_); - if (!started_) { - auto read_callback = std::make_unique<HTTPReadByteOutputCallback>(66560, true); - http_client_future_ = std::async(std::launch::async, submit_read_client, http_client_, read_callback.get()); - http_client_->setReadCallback(std::move(read_callback)); - started_ = true; - } + std::lock_guard<std::mutex> lock(mutex_); + if (!read_started_) { + auto read_callback = std::make_unique<HTTPReadByteOutputCallback>(66560, true); + http_client_read_future_ = std::async(std::launch::async, submit_read_client, http_client_, read_callback.get()); + http_client_->setReadCallback(std::move(read_callback)); + read_started_ = true; } return gsl::not_null(getByteOutputReadCallback())->readFully(reinterpret_cast<char*>(buf.data()), buf.size()); } else { diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py index 4a64a982e..7082d7820 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py @@ -133,6 +133,7 @@ class Minifi_flow_json_serializer: 'targetUri': group.url, 'communicationsTimeout': '30 sec', 'yieldDuration': '3 sec', + 'transportProtocol': group.transport_protocol, 'outputPorts': [] } diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py index 4c951a051..c3e733d26 100644 --- a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py @@ -136,6 +136,7 @@ class Minifi_flow_yaml_serializer: 'url': group.url, 'timeout': '30 sec', 'yield period': '3 sec', + 'transport protocol': group.transport_protocol, 'Output Ports': [] } diff --git a/libminifi/include/sitetosite/HttpSiteToSiteClient.h b/libminifi/include/sitetosite/HttpSiteToSiteClient.h index 1d8446ae5..a235b6b9d 100644 --- a/libminifi/include/sitetosite/HttpSiteToSiteClient.h +++ b/libminifi/include/sitetosite/HttpSiteToSiteClient.h @@ -77,6 +77,9 @@ class HttpSiteToSiteClient final : public SiteToSiteClient { void deleteTransaction(const utils::Identifier& transaction_id) override; void tearDown() override; + protected: + std::pair<uint64_t, uint64_t> readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) override; + private: void setSiteToSiteHeaders(minifi::http::HTTPClient& client); void closeTransaction(const utils::Identifier &transaction_id); diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h index ab4987943..656f71e65 100644 --- a/libminifi/include/sitetosite/SiteToSiteClient.h +++ b/libminifi/include/sitetosite/SiteToSiteClient.h @@ -25,6 +25,7 @@ #include <utility> #include <vector> #include <optional> +#include <expected> #include "Peer.h" #include "SiteToSite.h" @@ -135,6 +136,7 @@ class SiteToSiteClient { virtual void deleteTransaction(const utils::Identifier &transaction_id); virtual std::optional<SiteToSiteResponse> readResponse(const std::shared_ptr<Transaction> &transaction); virtual bool writeResponse(const std::shared_ptr<Transaction> &transaction, const SiteToSiteResponse& response); + virtual std::pair<uint64_t, uint64_t> readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session); bool initializeSend(const std::shared_ptr<Transaction>& transaction); bool writeAttributesInSendTransaction(io::OutputStream& stream, const std::string& transaction_id_str, const std::map<std::string, std::string>& attributes); @@ -187,9 +189,8 @@ class SiteToSiteClient { bool completeReceive(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id); bool completeSend(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id, core::ProcessContext& context); - bool readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id, SiteToSiteClient::ReceiveFlowFileHeaderResult& result); - std::optional<ReceiveFlowFileHeaderResult> receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr<Transaction>& transaction); - std::pair<uint64_t, uint64_t> readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session); + std::expected<void, std::string> readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id, SiteToSiteClient::ReceiveFlowFileHeaderResult& result); + std::expected<ReceiveFlowFileHeaderResult, std::string> receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr<Transaction>& transaction); std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<SiteToSiteClient>::getLogger()}; }; diff --git a/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp b/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp index 29ea227b4..aacc70609 100644 --- a/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/HttpSiteToSiteClient.cpp @@ -141,7 +141,6 @@ std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(TransferDir } else { transaction_client = openConnectionForReceive(transaction); transaction->setDataAvailable(true); - // 201 tells us that data is available. 200 would mean that nothing is available. } gsl_Assert(transaction_client); @@ -352,13 +351,11 @@ void HttpSiteToSiteClient::closeTransaction(const utils::Identifier &transaction logger_->log_debug("Received {} response code from delete", client->getResponseCode()); - if (client->getResponseCode() == 400) { + if (client->getResponseCode() >= 400) { std::string error(client->getResponseBody().data(), client->getResponseBody().size()); - logger_->log_warn("400 received: {}", error); - std::stringstream message; - message << "Received " << client->getResponseCode() << " from " << uri.str(); - throw Exception(SITE2SITE_EXCEPTION, message.str()); + logger_->log_warn("{} received: {}", client->getResponseCode(), error); + throw Exception(SITE2SITE_EXCEPTION, fmt::format("Received {} from {}", client->getResponseCode(), uri.str())); } transaction->close(); @@ -388,4 +385,35 @@ void HttpSiteToSiteClient::setSiteToSiteHeaders(minifi::http::HTTPClient& client } } +std::pair<uint64_t, uint64_t> HttpSiteToSiteClient::readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) { + auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream()); + if (!http_stream) { + throw Exception(SITE2SITE_EXCEPTION, "Reading flow files failed: stream cannot be cast to HTTP stream"); + } + + std::pair<uint64_t, uint64_t> read_result; + try { + read_result = SiteToSiteClient::readFlowFiles(transaction, session); + } catch (const Exception&) { + auto response_code = http_stream->getClientRef()->getResponseCode(); + + // 200 tells us that there is no content to read, so we should not treat it as an error. + // The read fails in this case because the stream does not contain a valid response body with the expected format. + // Unfortunately there is no way to get the response code before trying to read, so we have to let it fail and check the response code afterwards. + if (response_code == 200) { + logger_->log_debug("Response code 200, no content to read"); + transaction->setDataAvailable(false); + transaction->setState(TransactionState::TRANSACTION_CANCELED); + current_code_ = ResponseCode::CANCEL_TRANSACTION; + return {0, 0}; + } + throw; + } + + if (auto response_code = http_stream->getClientRef()->getResponseCode(); response_code >= 400) { + throw Exception(SITE2SITE_EXCEPTION, fmt::format("HTTP error code received while reading flow files: {}", response_code)); + } + return read_result; +} + } // namespace org::apache::nifi::minifi::sitetosite diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index e34c94858..290097b40 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -559,11 +559,10 @@ bool SiteToSiteClient::sendPacket(const DataPacket& packet) { return true; } -bool SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id_str, SiteToSiteClient::ReceiveFlowFileHeaderResult& result) { +std::expected<void, std::string> SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id_str, SiteToSiteClient::ReceiveFlowFileHeaderResult& result) { uint32_t num_attributes = 0; if (const auto ret = stream.read(num_attributes); ret == 0 || io::isError(ret) || num_attributes > MAX_NUM_ATTRIBUTES) { - logger_->log_error("Site2Site failed to read number of attributes with return code {}, or number of attributes is invalid: {}", ret, num_attributes); - return false; + return std::unexpected(fmt::format("Site2Site failed to read number of attributes with return code {}, or number of attributes is invalid: {}", ret, num_attributes)); } logger_->log_debug("Site2Site transaction {} receives {} attributes", transaction_id_str, num_attributes); @@ -571,13 +570,11 @@ bool SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std std::string key; std::string value; if (const auto ret = stream.read(key, true); ret == 0 || io::isError(ret)) { - logger_->log_error("Site2Site transaction {} failed to read attribute key", transaction_id_str); - return false; + return std::unexpected(fmt::format("Site2Site transaction {} failed to read attribute key", transaction_id_str)); } if (const auto ret = stream.read(value, true); ret == 0 || io::isError(ret)) { - logger_->log_error("Site2Site transaction {} failed to read attribute value for key {}", transaction_id_str, key); - return false; + return std::unexpected(fmt::format("Site2Site transaction {} failed to read attribute value for key {}", transaction_id_str, key)); } result.attributes[key] = value; @@ -586,32 +583,29 @@ bool SiteToSiteClient::readFlowFileHeaderData(io::InputStream& stream, const std uint64_t len = 0; if (const auto ret = stream.read(len); ret == 0 || io::isError(ret)) { - logger_->log_error("Site2Site transaction {} failed to read flow file data size", transaction_id_str); - return false; + return std::unexpected(fmt::format("Site2Site transaction {} failed to read flow file data size", transaction_id_str)); } result.flow_file_data_size = len; - return true; + return {}; } -std::optional<SiteToSiteClient::ReceiveFlowFileHeaderResult> SiteToSiteClient::receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr<Transaction>& transaction) { +std::expected<SiteToSiteClient::ReceiveFlowFileHeaderResult, std::string> SiteToSiteClient::receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr<Transaction>& transaction) { if (peer_state_ != PeerState::READY) { bootstrap(); } if (peer_state_ != PeerState::READY) { - return std::nullopt; + return std::unexpected("Peer state is not ready"); } const auto transaction_id_str = transaction->getUUIDStr(); if (transaction->getState() != TransactionState::TRANSACTION_STARTED && transaction->getState() != TransactionState::DATA_EXCHANGED) { - logger_->log_warn("Site2Site transaction {} is not at started or exchanged state", transaction_id_str); - return std::nullopt; + return std::unexpected(fmt::format("Site2Site transaction {} is not at started or exchanged state", transaction_id_str)); } if (transaction->getDirection() != TransferDirection::RECEIVE) { - logger_->log_warn("Site2Site transaction {} direction is wrong", transaction_id_str); - return std::nullopt; + return std::unexpected(fmt::format("Site2Site transaction {} direction is wrong", transaction_id_str)); } ReceiveFlowFileHeaderResult result; @@ -624,7 +618,7 @@ std::optional<SiteToSiteClient::ReceiveFlowFileHeaderResult> SiteToSiteClient::r // if we already have transferred a flow file before, check to see whether another one is available auto response = readResponse(transaction); if (!response) { - return std::nullopt; + return std::unexpected("Failed to read response"); } if (response->code == ResponseCode::CONTINUE_TRANSACTION) { logger_->log_debug("Site2Site transaction {} peer indicate continue transaction", transaction_id_str); @@ -635,8 +629,7 @@ std::optional<SiteToSiteClient::ReceiveFlowFileHeaderResult> SiteToSiteClient::r result.eof = true; return result; } else { - logger_->log_debug("Site2Site transaction {} peer indicate wrong response code {}", transaction_id_str, magic_enum::enum_underlying(response->code)); - return std::nullopt; + return std::unexpected(fmt::format("Site2Site transaction {} peer indicate wrong response code {}", transaction_id_str, magic_enum::enum_underlying(response->code))); } } @@ -646,9 +639,8 @@ std::optional<SiteToSiteClient::ReceiveFlowFileHeaderResult> SiteToSiteClient::r return result; } - if (!readFlowFileHeaderData(stream, transaction_id_str, result)) { - logger_->log_error("Site2Site transaction {} failed to read flow file header data", transaction_id_str); - return std::nullopt; + if (auto ret = readFlowFileHeaderData(stream, transaction_id_str, result); !ret.has_value()) { + return std::unexpected(fmt::format("Site2Site transaction {} failed to read flow file header data: {}", transaction_id_str, ret.error())); } if (result.flow_file_data_size > 0 || !result.attributes.empty()) { @@ -679,14 +671,14 @@ std::pair<uint64_t, uint64_t> SiteToSiteClient::readFlowFiles(const std::shared_ compression_stream = std::make_unique<CompressionInputStream>(transaction->getStream()); compression_wrapper_crc_stream = std::make_unique<io::CRCStream<io::InputStream>>(gsl::make_not_null(compression_stream.get())); } - io::InputStream& stream = use_compression_ ? static_cast<io::InputStream&>(*compression_wrapper_crc_stream) : static_cast<io::InputStream&>(transaction->getStream()); + io::InputStream& stream = use_compression_ ? static_cast<io::InputStream&>(*compression_wrapper_crc_stream) : transaction->getStream(); while (true) { auto start_time = std::chrono::steady_clock::now(); auto receive_header_result = receiveFlowFileHeader(stream, transaction); if (!receive_header_result) { - throw Exception(SITE2SITE_EXCEPTION, "Receive Failed " + transaction->getUUIDStr()); + throw Exception(SITE2SITE_EXCEPTION, fmt::format("Receive Failed for {}: {}", transaction->getUUIDStr(), receive_header_result.error())); } if (receive_header_result->eof) {
