This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit af585c2645512b027038240ef0f8df6f3e640837
Author: Adam Debreceni <[email protected]>
AuthorDate: Thu Jan 29 16:17:14 2026 +0100

    MINIFICPP-2705 - Configurable timeout, download assets directly to disk
    
    Closes #2088
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 C2.md                                              |  6 ++
 conf/minifi.properties.in                          |  6 ++
 core-framework/include/http/BaseHTTPClient.h       | 16 ++++-
 core-framework/include/http/HTTPClient.h           |  9 ++-
 core-framework/include/http/HTTPStream.h           | 18 ++++--
 core-framework/include/utils/ByteArrayCallback.h   |  4 +-
 core-framework/src/http/BaseHTTPClient.cpp         |  6 +-
 core-framework/src/http/HTTPClient.cpp             | 10 +--
 core-framework/src/http/HTTPStream.cpp             |  6 +-
 core-framework/src/utils/ByteArrayCallback.cpp     |  4 +-
 libminifi/include/c2/C2Protocol.h                  |  6 +-
 libminifi/include/c2/protocols/RESTSender.h        |  3 +-
 libminifi/include/utils/file/AssetManager.h        |  3 +-
 libminifi/src/c2/C2Agent.cpp                       | 74 ++++++++++++++--------
 libminifi/src/c2/protocols/RESTSender.cpp          | 52 ++++++++++++++-
 libminifi/src/utils/file/AssetManager.cpp          | 49 +++++++++-----
 .../include/minifi-cpp/properties/Configuration.h  |  1 +
 17 files changed, 200 insertions(+), 73 deletions(-)

diff --git a/C2.md b/C2.md
index 8ec0c9562..f934cd0e7 100644
--- a/C2.md
+++ b/C2.md
@@ -115,6 +115,12 @@ be requested via C2 DESCRIBE manifest command.
     # specify the maximum number of bulletins to send in a heartbeat
     # nifi.c2.flow.info.processor.bulletin.limit=1000
 
+    # Specify timeout for asset download operations. The entire download must
+    # finish in the specified amount of time. There is a separate fixed 30 
second
+    # timeout from the last received data packet.
+    # setting to 0 disables the timeout (default)
+    nifi.c2.asset.download.timeout=0s
+
 #### Flow Id and URL
 
 Flow id and URL are usually retrieved from the C2 server. These identify the 
last updated flow version and where the flow was downloaded from. These 
properties are persisted in the minifi.properties file.
diff --git a/conf/minifi.properties.in b/conf/minifi.properties.in
index 7ae002fbc..636632e8c 100644
--- a/conf/minifi.properties.in
+++ b/conf/minifi.properties.in
@@ -127,6 +127,12 @@ nifi.c2.full.heartbeat=false
 # specify the maximum number of bulletins to send in a heartbeat
 #nifi.c2.flow.info.processor.bulletin.limit=1000
 
+# Specify timeout for asset download operations. The entire download must
+# finish in the specified amount of time. There is a separate fixed 30 second
+# timeout from the last received data packet.
+# setting to 0 disables the timeout (default)
+# nifi.c2.asset.download.timeout=0s
+
 ## enable the controller socket provider on port 9998
 ## off by default.
 #controller.socket.enable=true
diff --git a/core-framework/include/http/BaseHTTPClient.h 
b/core-framework/include/http/BaseHTTPClient.h
index 84c7bbb7c..1438ab1d6 100644
--- a/core-framework/include/http/BaseHTTPClient.h
+++ b/core-framework/include/http/BaseHTTPClient.h
@@ -106,11 +106,23 @@ class HTTPUploadStreamContentsCallback : public 
HTTPUploadCallback {
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<HTTPUploadStreamContentsCallback>::getLogger();
 };
 
-class HTTPReadCallback : public utils::ByteOutputCallback {
+class HTTPReadCallback {
  public:
-  using ByteOutputCallback::ByteOutputCallback;
+  virtual bool process(std::span<const char> data) = 0;
+  virtual ~HTTPReadCallback() = default;
 
   std::atomic<bool> stop = false;
+};
+
+class HTTPReadByteOutputCallback : public HTTPReadCallback, public 
utils::ByteOutputCallback {
+ public:
+  using ByteOutputCallback::ByteOutputCallback;
+
+  bool process(std::span<const char> data) override {
+    ByteOutputCallback::write(data.data(), data.size());
+    return true;
+  }
+
   std::atomic<size_t> pos = 0;
 };
 
diff --git a/core-framework/include/http/HTTPClient.h 
b/core-framework/include/http/HTTPClient.h
index b38d4b0db..9339a5b21 100644
--- a/core-framework/include/http/HTTPClient.h
+++ b/core-framework/include/http/HTTPClient.h
@@ -99,6 +99,10 @@ class HTTPClient : public BaseHTTPClient, public 
core::ConnectableImpl {
 
   void setReadTimeout(std::chrono::milliseconds timeout) override;
 
+  void setAbsoluteTimeout(std::optional<std::chrono::milliseconds> timeout) {
+    absolute_timeout_ = timeout;
+  }
+
   void setUploadCallback(std::unique_ptr<HTTPUploadCallback> callback) 
override;
 
   void setReadCallback(std::unique_ptr<HTTPReadCallback> callback);
@@ -226,9 +230,7 @@ class HTTPClient : public BaseHTTPClient, public 
core::ConnectableImpl {
 
   void configure_secure_connection();
 
-  std::chrono::milliseconds getAbsoluteTimeout() const { return 
3*read_timeout_; }
-
-  HTTPReadCallback content_{std::numeric_limits<size_t>::max()};
+  HTTPReadByteOutputCallback content_{std::numeric_limits<size_t>::max()};
 
   std::shared_ptr<minifi::controllers::SSLContextServiceInterface> 
ssl_context_service_;
   std::string url_;
@@ -236,6 +238,7 @@ class HTTPClient : public BaseHTTPClient, public 
core::ConnectableImpl {
 
   std::chrono::milliseconds connect_timeout_{std::chrono::seconds(30)};
   std::chrono::milliseconds read_timeout_{std::chrono::seconds(30)};
+  std::optional<std::chrono::milliseconds> absolute_timeout_;
 
   HTTPResponseData response_data_;
 
diff --git a/core-framework/include/http/HTTPStream.h 
b/core-framework/include/http/HTTPStream.h
index 8bb4329b0..7a513bd31 100644
--- a/core-framework/include/http/HTTPStream.h
+++ b/core-framework/include/http/HTTPStream.h
@@ -115,9 +115,9 @@ class HttpStream : public io::BaseStreamImpl {
 
   inline bool isFinished(int seconds = 0) {
     return http_client_future_.wait_for(std::chrono::seconds(seconds)) == 
std::future_status::ready
-        && http_client_->getReadCallback()
-        && http_client_->getReadCallback()->getSize() == 0
-        && http_client_->getReadCallback()->waitingOps();
+        && getByteOutputReadCallback()
+        && getByteOutputReadCallback()->getSize() == 0
+        && getByteOutputReadCallback()->waitingOps();
   }
 
   /**
@@ -127,11 +127,11 @@ class HttpStream : public io::BaseStreamImpl {
     do {
       logger_->log_trace("Waiting for more data");
     } while (http_client_future_.wait_for(std::chrono::seconds(0)) != 
std::future_status::ready
-        && http_client_->getReadCallback()
-        && http_client_->getReadCallback()->getSize() == 0);
+        && getByteOutputReadCallback()
+        && getByteOutputReadCallback()->getSize() == 0);
 
-    return http_client_->getReadCallback()
-        && http_client_->getReadCallback()->getSize() > 0;
+    return getByteOutputReadCallback()
+        && getByteOutputReadCallback()->getSize() > 0;
   }
 
  protected:
@@ -147,6 +147,10 @@ class HttpStream : public io::BaseStreamImpl {
   std::atomic<bool> started_{false};
 
  private:
+  utils::ByteOutputCallback* getByteOutputReadCallback() {
+    return 
dynamic_cast<utils::ByteOutputCallback*>(http_client_->getReadCallback());
+  }
+
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<HttpStream>::getLogger();
 };
 }  // namespace org::apache::nifi::minifi::http
diff --git a/core-framework/include/utils/ByteArrayCallback.h 
b/core-framework/include/utils/ByteArrayCallback.h
index 76d34146d..d07b7bf1d 100644
--- a/core-framework/include/utils/ByteArrayCallback.h
+++ b/core-framework/include/utils/ByteArrayCallback.h
@@ -111,12 +111,12 @@ class ByteOutputCallback {
 
   bool waitingOps();
 
-  virtual void write(char *data, size_t size);
+  virtual void write(const char *data, size_t size);
 
   size_t readFully(char *buffer, size_t size);
 
  protected:
-  inline void write_and_notify(char *data, size_t size);
+  inline void write_and_notify(const char *data, size_t size);
 
   inline size_t read_current_str(char *buffer, size_t size);
 
diff --git a/core-framework/src/http/BaseHTTPClient.cpp 
b/core-framework/src/http/BaseHTTPClient.cpp
index b0ce9d382..f2fad4aff 100644
--- a/core-framework/src/http/BaseHTTPClient.cpp
+++ b/core-framework/src/http/BaseHTTPClient.cpp
@@ -173,8 +173,10 @@ size_t HTTPRequestResponse::receiveWrite(char *data, 
size_t size, size_t nmemb,
     if (callback->stop) {
       return CALLBACK_ABORT;
     }
-    callback->write(data, (size * nmemb));
-    return (size * nmemb);
+    if (!callback->process(std::span(data, size * nmemb))) {
+      return CALLBACK_ABORT;
+    }
+    return size * nmemb;
   } catch (...) {
     return CALLBACK_ABORT;
   }
diff --git a/core-framework/src/http/HTTPClient.cpp 
b/core-framework/src/http/HTTPClient.cpp
index af8d965ea..76e5d1116 100644
--- a/core-framework/src/http/HTTPClient.cpp
+++ b/core-framework/src/http/HTTPClient.cpp
@@ -334,9 +334,11 @@ bool HTTPClient::submit() {
 
   response_data_.clear();
 
+  const auto absolute_timeout = absolute_timeout_.value_or(3 * read_timeout_);
+
   curl_easy_setopt(http_session_.get(), CURLOPT_NOSIGNAL, 1);
   curl_easy_setopt(http_session_.get(), CURLOPT_CONNECTTIMEOUT_MS, 
connect_timeout_.count());
-  curl_easy_setopt(http_session_.get(), CURLOPT_TIMEOUT_MS, 
getAbsoluteTimeout().count());
+  curl_easy_setopt(http_session_.get(), CURLOPT_TIMEOUT_MS, 
absolute_timeout.count());
 
   if (read_timeout_ > 0ms) {
     progress_.reset();
@@ -376,7 +378,7 @@ bool HTTPClient::submit() {
   response_data_.response_code = http_code;
   curl_easy_getinfo(http_session_.get(), CURLINFO_CONTENT_TYPE, 
&response_data_.response_content_type);
   if (res_ == CURLE_OPERATION_TIMEDOUT) {
-    logger_->log_error("HTTP operation timed out, with absolute timeout {}\n", 
getAbsoluteTimeout());
+    logger_->log_error("HTTP operation timed out, with absolute timeout {}\n", 
absolute_timeout);
   }
   if (res_ != CURLE_OK) {
     logger_->log_info("{}", request_headers_.size());
@@ -398,8 +400,8 @@ const char *HTTPClient::getContentType() {
 
 const std::vector<char> &HTTPClient::getResponseBody() {
   if (response_data_.response_body.empty()) {
-    if (read_callback_) {
-      response_data_.response_body = read_callback_->to_string();
+    if (auto byte_output_callback = 
dynamic_cast<utils::ByteOutputCallback*>(read_callback_.get())) {
+      response_data_.response_body = byte_output_callback->to_string();
     } else {
       response_data_.response_body = content_.to_string();
     }
diff --git a/core-framework/src/http/HTTPStream.cpp 
b/core-framework/src/http/HTTPStream.cpp
index 32e1857df..61f076e45 100644
--- a/core-framework/src/http/HTTPStream.cpp
+++ b/core-framework/src/http/HTTPStream.cpp
@@ -33,7 +33,7 @@ HttpStream::HttpStream(std::shared_ptr<HTTPClient> client)
 }
 
 void HttpStream::close() {
-  if (auto read_callback = http_client_->getReadCallback())
+  if (auto read_callback = getByteOutputReadCallback())
     read_callback->close();
   if (auto upload_callback = http_client_->getUploadCallback())
     upload_callback->close();
@@ -78,13 +78,13 @@ size_t HttpStream::read(std::span<std::byte> buf) {
     if (!started_) {
       std::lock_guard<std::mutex> lock(mutex_);
       if (!started_) {
-        auto read_callback = std::make_unique<HTTPReadCallback>(66560, true);
+        auto read_callback = 
std::make_unique<HTTPReadByteOutputCallback>(66560, true);
         http_client_future_ = std::async(std::launch::async, 
submit_read_client, http_client_, read_callback.get());
         http_client_->setReadCallback(std::move(read_callback));
         started_ = true;
       }
     }
-    return 
http_client_->getReadCallback()->readFully(reinterpret_cast<char*>(buf.data()), 
buf.size());
+    return 
gsl::not_null(getByteOutputReadCallback())->readFully(reinterpret_cast<char*>(buf.data()),
 buf.size());
   } else {
     return io::STREAM_ERROR;
   }
diff --git a/core-framework/src/utils/ByteArrayCallback.cpp 
b/core-framework/src/utils/ByteArrayCallback.cpp
index 8446e9a27..647d7e5c9 100644
--- a/core-framework/src/utils/ByteArrayCallback.cpp
+++ b/core-framework/src/utils/ByteArrayCallback.cpp
@@ -62,7 +62,7 @@ bool ByteOutputCallback::waitingOps() {
   return true;
 }
 
-void ByteOutputCallback::write(char *data, size_t size) {
+void ByteOutputCallback::write(const char *data, size_t size) {
   if (!read_started_) {
     std::unique_lock<std::recursive_mutex> lock(vector_lock_);
     spinner_.wait(lock, [&] {
@@ -73,7 +73,7 @@ void ByteOutputCallback::write(char *data, size_t size) {
   write_and_notify(data, size);
 }
 
-void ByteOutputCallback::write_and_notify(char *data, size_t size) {
+void ByteOutputCallback::write_and_notify(const char *data, size_t size) {
   queue_.enqueue(std::string(data, size));
   size_ += size;
   total_written_ += size;
diff --git a/libminifi/include/c2/C2Protocol.h 
b/libminifi/include/c2/C2Protocol.h
index 87de2220d..e5e08fdfe 100644
--- a/libminifi/include/c2/C2Protocol.h
+++ b/libminifi/include/c2/C2Protocol.h
@@ -72,8 +72,10 @@ class C2Protocol : public core::ConnectableImpl {
    */
   virtual C2Payload consumePayload(const C2Payload &operation, Direction 
direction = TRANSMIT, bool async = false) = 0;
 
-  virtual C2Payload fetch(const std::string& url, const 
std::vector<std::string>& /*accepted_formats*/ = {}, bool async = false) {
-    return consumePayload(url, C2Payload(Operation::transfer, true), 
Direction::RECEIVE, async);
+  virtual bool fetch(const std::string& url, const std::vector<std::string>& 
accepted_formats, std::function<bool(std::span<const char> chunk)> 
chunk_callback) = 0;
+
+  bool fetch(const std::string& url, std::function<bool(std::span<const char> 
chunk)> chunk_callback) {
+    return fetch(url, {}, std::move(chunk_callback));
   }
 
   /**
diff --git a/libminifi/include/c2/protocols/RESTSender.h 
b/libminifi/include/c2/protocols/RESTSender.h
index f20283ae3..1114ea605 100644
--- a/libminifi/include/c2/protocols/RESTSender.h
+++ b/libminifi/include/c2/protocols/RESTSender.h
@@ -52,7 +52,7 @@ class RESTSender : public RESTProtocol, public C2Protocol {
 
   C2Payload consumePayload(const C2Payload &payload, Direction direction, bool 
async) override;
 
-  C2Payload fetch(const std::string& url, const std::vector<std::string>& 
accepted_formats, bool async) override;
+  bool fetch(const std::string& url, const std::vector<std::string>& 
accepted_formats, std::function<bool(std::span<const char> chunk)> 
chunk_callback) override;
 
   void update(const std::shared_ptr<Configure> &configure) override;
 
@@ -77,6 +77,7 @@ class RESTSender : public RESTProtocol, public C2Protocol {
   RequestEncoding req_encoding_ = RequestEncoding::none;
 
  private:
+  std::chrono::milliseconds asset_download_timeout_{0};
   std::shared_ptr<core::logging::Logger> logger_ = 
core::logging::LoggerFactory<RESTSender>::getLogger();
 };
 
diff --git a/libminifi/include/utils/file/AssetManager.h 
b/libminifi/include/utils/file/AssetManager.h
index a2e47384a..e90561c2e 100644
--- a/libminifi/include/utils/file/AssetManager.h
+++ b/libminifi/include/utils/file/AssetManager.h
@@ -53,7 +53,8 @@ class AssetManager {
  public:
   explicit AssetManager(const Configure& configuration);
 
-  nonstd::expected<void, std::string> sync(const AssetLayout& layout, const 
std::function<nonstd::expected<std::vector<std::byte>, 
std::string>(std::string_view /*url*/)>& fetch);
+  nonstd::expected<void, std::string> sync(const AssetLayout& layout,
+      const std::function<nonstd::expected<void, std::string>(std::string_view 
/*url*/, const std::filesystem::path& /*tmp_path*/)>& fetch);
 
   std::string hash() const;
 
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 4ac14730b..7f5dc679a 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -321,6 +321,33 @@ struct C2DebugBundleError : public C2TransferError {
   using C2TransferError::C2TransferError;
 };
 
+nonstd::expected<void, std::string> fetchAssetAsFile(C2Protocol& protocol, 
const std::string& url, const std::filesystem::path& file_path, const 
std::shared_ptr<core::logging::Logger>& logger) {
+  if (utils::file::create_dir(file_path.parent_path()) != 0) {
+    return nonstd::make_unexpected(fmt::format("Failed to create directory 
'{}'", file_path.parent_path().string()));
+  }
+
+  std::ofstream file{file_path, std::ofstream::binary};
+  if (!file) {
+    return nonstd::make_unexpected(fmt::format("Failed to open file to write 
'{}'", file_path.string()));
+  }
+  bool success = protocol.fetch(url, [&] (std::span<const char> chunk) {
+    file.write(chunk.data(), gsl::narrow<std::streamsize>(chunk.size()));
+    return file.good();
+  });
+  file.close();
+  if (!file || !success) {
+    std::error_code ec;
+    std::filesystem::remove(file_path, ec);
+    if (ec) {
+      logger->log_error("Failed remove partial asset file '{}'", 
file_path.string());
+    } else {
+      logger->log_info("Successfully removed partial asset file '{}'", 
file_path.string());
+    }
+    return nonstd::make_unexpected(fmt::format("Failed to fetch asset from 
'{}'", url));
+  }
+  return {};
+}
+
 }  // namespace
 
 void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
@@ -828,21 +855,13 @@ void C2Agent::handle_sync(const 
org::apache::nifi::minifi::c2::C2ContentResponse
     });
   }
 
-  auto fetch = [&] (std::string_view url) -> 
nonstd::expected<std::vector<std::byte>, std::string> {
+  auto result = asset_manager_->sync(asset_layout, [&] (std::string_view url, 
const std::filesystem::path& file_path) -> nonstd::expected<void, std::string> {
     auto resolved_url = resolveUrl(std::string{url});
     if (!resolved_url) {
       return nonstd::make_unexpected("Couldn't resolve url");
     }
-    C2Payload file_response = protocol_->fetch(resolved_url.value());
-
-    if (file_response.getStatus().getState() != 
state::UpdateState::READ_COMPLETE) {
-      return nonstd::make_unexpected("Failed to fetch file from " + 
resolved_url.value());
-    }
-
-    return std::move(file_response).moveRawData();
-  };
-
-  auto result = asset_manager_->sync(asset_layout, fetch);
+    return fetchAssetAsFile(*protocol_, resolved_url.value(), file_path, 
logger_);
+  });
   if (!result) {
     send_error(result.error());
     return;
@@ -1012,9 +1031,18 @@ std::optional<std::string> C2Agent::fetchFlow(const 
std::string& uri) const {
     return std::nullopt;
   }
 
-  C2Payload response = protocol_->fetch(resolved_url.value(), 
update_sink_->getSupportedConfigurationFormats());
+  std::string flow_content;
+  bool success = protocol_->fetch(resolved_url.value(), 
update_sink_->getSupportedConfigurationFormats(), [&] (std::span<const char> 
chunk) {
+    flow_content.append(chunk.data(), chunk.size());
+    return true;
+  });
+
+  if (!success) {
+    logger_->log_error("Could not fetch flow content from '{}'", uri);
+    return std::nullopt;
+  }
 
-  return response.getRawDataAsString();
+  return flow_content;
 }
 
 std::optional<std::string> C2Agent::getFlowIdFromConfigUpdate(const 
C2ContentResponse &resp) {
@@ -1140,25 +1168,21 @@ void C2Agent::handleAssetUpdate(const 
C2ContentResponse& resp) {
     return;
   }
 
-  C2Payload file_response = protocol_->fetch(url);
+  std::filesystem::path tmp_file{file_path.string() + ".part"};
 
-  if (file_response.getStatus().getState() != 
state::UpdateState::READ_COMPLETE) {
-    send_error("Failed to fetch asset from '" + url + "'");
+  auto fetch_status = fetchAssetAsFile(*protocol_, url, tmp_file, logger_);
+  if (!fetch_status) {
+    send_error(fetch_status.error());
     return;
   }
 
-  auto raw_data = std::move(file_response).moveRawData();
-  // ensure directory exists for file
-  if (utils::file::create_dir(file_path.parent_path()) != 0) {
-    send_error("Failed to create directory '" + 
file_path.parent_path().string() + "'");
+  std::error_code ec;
+  std::filesystem::rename(tmp_file, file_path, ec);
+  if (ec) {
+    send_error(fmt::format("Failed to move temporary asset file '{}' to '{}'", 
tmp_file.string(), file_path.string()));
     return;
   }
 
-  {
-    std::ofstream file{file_path, std::ofstream::binary};
-    file.write(reinterpret_cast<const char*>(raw_data.data()), 
gsl::narrow<std::streamsize>(raw_data.size()));
-  }
-
   C2Payload response(Operation::acknowledge, 
state::UpdateState::FULLY_APPLIED, resp.ident, true);
   enqueue_c2_response(std::move(response));
 }
diff --git a/libminifi/src/c2/protocols/RESTSender.cpp 
b/libminifi/src/c2/protocols/RESTSender.cpp
index a9835663e..709f81589 100644
--- a/libminifi/src/c2/protocols/RESTSender.cpp
+++ b/libminifi/src/c2/protocols/RESTSender.cpp
@@ -75,6 +75,8 @@ void 
RESTSender::initialize(core::controller::ControllerServiceProvider* control
       logger_->log_debug("Request encoding is not specified, using default 
'{}'", magic_enum::enum_name(RequestEncoding::none));
       req_encoding_ = RequestEncoding::none;
     }
+    asset_download_timeout_ = 
(configure->get(Configuration::nifi_c2_asset_download_timeout)
+        | utils::andThen([] (const auto& s) { return parsing::parseDuration(s) 
| utils::toOptional(); })).value_or(0s);
   }
   logger_->log_debug("Submitting to {}", rest_uri_);
 }
@@ -173,7 +175,7 @@ C2Payload RESTSender::sendPayload(const std::string& url, 
const Direction direct
   }
 
   if (payload.getOperation() == Operation::transfer) {
-    auto read = 
std::make_unique<http::HTTPReadCallback>(std::numeric_limits<size_t>::max());
+    auto read = 
std::make_unique<http::HTTPReadByteOutputCallback>(std::numeric_limits<size_t>::max());
     client.setReadCallback(std::move(read));
     if (accepted_formats && !accepted_formats->empty()) {
       client.setRequestHeader("Accept", utils::string::join(", ", 
accepted_formats.value()));
@@ -207,8 +209,52 @@ C2Payload RESTSender::sendPayload(const std::string& url, 
const Direction direct
   }
 }
 
-C2Payload RESTSender::fetch(const std::string& url, const 
std::vector<std::string>& accepted_formats, bool /*async*/) {
-  return sendPayload(url, Direction::RECEIVE, C2Payload(Operation::transfer, 
true), std::nullopt, accepted_formats);
+namespace {
+
+class ForwardingHTTPReadCallback : public http::HTTPReadCallback {
+ public:
+  explicit ForwardingHTTPReadCallback(std::function<bool(std::span<const char> 
chunk)> callback): callback_(std::move(callback)) {}
+  bool process(std::span<const char> data) override {
+    return callback_(data);
+  }
+
+ private:
+  std::function<bool(std::span<const char> chunk)> callback_;
+};
+
+}  // namespace
+
+bool RESTSender::fetch(const std::string& url, const std::vector<std::string>& 
accepted_formats, std::function<bool(std::span<const char> chunk)> 
chunk_callback) {
+  if (url.empty()) {
+    return false;
+  }
+  http::HTTPClient client(url, ssl_context_service_);
+  client.setKeepAliveProbe(http::KeepAliveProbeData{2s, 2s});
+  client.setConnectionTimeout(2s);
+  client.set_request_method(http::HttpRequestMethod::Get);
+  client.setAbsoluteTimeout(asset_download_timeout_);
+  if (url.starts_with("https://";)) {
+    if (!ssl_context_service_) {
+      setSecurityContext(client, http::HttpRequestMethod::Get, url);
+    } else {
+      client.initialize(http::HttpRequestMethod::Get, url, 
ssl_context_service_);
+    }
+  }
+  auto read = 
std::make_unique<ForwardingHTTPReadCallback>(std::move(chunk_callback));
+  client.setReadCallback(std::move(read));
+  if (!accepted_formats.empty()) {
+    client.setRequestHeader("Accept", utils::string::join(", ", 
accepted_formats));
+  }
+  bool is_okay = client.submit();
+  int64_t response_code = client.getResponseCode();
+  const bool client_error = 400 <= response_code && response_code < 500;
+  const bool server_error = 500 <= response_code && response_code < 600;
+  if (client_error || server_error) {
+    logger_->log_error("Error response code '{}' from '{}'", response_code, 
url);
+  } else {
+    logger_->log_debug("Response code '{}' from '{}'", response_code, url);
+  }
+  return is_okay && !client_error && !server_error;
 }
 
 REGISTER_RESOURCE(RESTSender, DescriptionOnly);
diff --git a/libminifi/src/utils/file/AssetManager.cpp 
b/libminifi/src/utils/file/AssetManager.cpp
index 531825a38..dc1793033 100644
--- a/libminifi/src/utils/file/AssetManager.cpp
+++ b/libminifi/src/utils/file/AssetManager.cpp
@@ -121,51 +121,68 @@ std::string AssetManager::hash() const {
 
 nonstd::expected<void, std::string> AssetManager::sync(
     const AssetLayout& layout,
-    const std::function<nonstd::expected<std::vector<std::byte>, 
std::string>(std::string_view /*url*/)>& fetch) {
+    const std::function<nonstd::expected<void, std::string>(std::string_view 
/*url*/, const std::filesystem::path& /*tmp_path*/)>& fetch) {
   logger_->log_info("Synchronizing assets");
   std::lock_guard lock(mtx_);
   AssetLayout new_state{
     .digest = state_.digest,
     .assets = {}
   };
-  std::string fetch_errors;
-  std::vector<std::pair<std::filesystem::path, std::vector<std::byte>>> 
new_file_contents;
+  std::string new_asset_errors;
+  std::vector<AssetDescription> new_assets;
   for (auto& new_entry : layout.assets) {
     if (std::find_if(state_.assets.begin(), state_.assets.end(), [&] (auto& 
entry) {return entry.id == new_entry.id;}) == state_.assets.end()) {
       logger_->log_info("Fetching asset (id = '{}', path = '{}') from {}", 
new_entry.id, new_entry.path.string(), new_entry.url);
-      if (auto data = fetch(new_entry.url)) {
-        new_file_contents.emplace_back(new_entry.path, data.value());
+      if (auto status = fetch(new_entry.url, (root_ / new_entry.path).string() 
+ ".part")) {
+        new_assets.emplace_back(new_entry);
         new_state.assets.insert(new_entry);
       } else {
-        logger_->log_error("Failed to fetch asset (id = '{}', path = '{}') 
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url, 
data.error());
-        fetch_errors += "Failed to fetch '" + new_entry.id + "' from '" + 
new_entry.url + "': " + data.error() + "\n";
+        logger_->log_error("Failed to fetch asset (id = '{}', path = '{}') 
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url, 
status.error());
+        new_asset_errors += "Failed to fetch '" + new_entry.id + "' from '" + 
new_entry.url + "': " + status.error() + "\n";
       }
     } else {
       logger_->log_info("Asset (id = '{}', path = '{}') already exists", 
new_entry.id, new_entry.path.string());
       new_state.assets.insert(new_entry);
     }
   }
-  if (fetch_errors.empty()) {
-    new_state.digest = layout.digest;
-  }
 
   for (auto& old_entry : state_.assets) {
     if (std::find_if(layout.assets.begin(), layout.assets.end(), [&] (auto& 
entry) {return entry.id == old_entry.id;}) == layout.assets.end()) {
       logger_->log_info("We no longer need asset (id = '{}', path = '{}')", 
old_entry.id, old_entry.path.string());
-      std::filesystem::remove(root_ / old_entry.path);
+      std::error_code ec;
+      std::filesystem::remove(root_ / old_entry.path, ec);
+      if (ec) {
+        logger_->log_error("Failed to delete obsolete asset (id = '{}', path = 
'{}')", old_entry.id, old_entry.path.string());
+      } else {
+        logger_->log_info("Successfully deleted obsolete asset (id = '{}', 
path = '{}')", old_entry.id, old_entry.path.string());
+      }
     }
   }
 
-  for (auto& [path, content] : new_file_contents) {
-    create_dir((root_ / path).parent_path());
-    std::ofstream{root_ / path, std::ios::binary}.write(reinterpret_cast<const 
char*>(content.data()), gsl::narrow<std::streamsize>(content.size()));
+  for (auto& asset : new_assets) {
+    auto full_path = root_ / asset.path;
+    std::error_code ec;
+    std::filesystem::rename(full_path.string() + ".part", full_path, ec);
+    if (ec) {
+      logger_->log_error("Failed to move temporary asset file '{}' to '{}'", 
full_path.string() + ".part", full_path.string());
+      new_asset_errors += fmt::format("Failed to move temporary asset file 
'{}' to '{}'", full_path.string() + ".part", full_path.string()) + "\n";
+      new_state.assets.erase(asset);
+    } else {
+      logger_->log_info("Successfully downloaded asset to file '{}'", 
full_path.string());
+    }
+  }
+
+  if (new_asset_errors.empty()) {
+    new_state.digest = layout.digest;
+  } else {
+    new_state.digest.clear();
   }
 
   state_ = std::move(new_state);
   persist();
 
-  if (!fetch_errors.empty()) {
-    return nonstd::make_unexpected(fetch_errors);
+  if (!new_asset_errors.empty()) {
+    return nonstd::make_unexpected(new_asset_errors);
   }
 
   return {};
diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h 
b/minifi-api/include/minifi-cpp/properties/Configuration.h
index 3fccff2c2..8ed94061b 100644
--- a/minifi-api/include/minifi-cpp/properties/Configuration.h
+++ b/minifi-api/include/minifi-cpp/properties/Configuration.h
@@ -122,6 +122,7 @@ class Configuration : public virtual Properties {
   static constexpr const char *nifi_c2_rest_heartbeat_minimize_updates = 
"nifi.c2.rest.heartbeat.minimize.updates";
   static constexpr const char *nifi_c2_rest_request_encoding = 
"nifi.c2.rest.request.encoding";
   static constexpr const char *nifi_c2_flow_info_processor_bulletin_limit = 
"nifi.c2.flow.info.processor.bulletin.limit";
+  static constexpr const char *nifi_c2_asset_download_timeout = 
"nifi.c2.asset.download.timeout";
 
   // state management options
   static constexpr const char *nifi_state_storage_local = 
"nifi.state.storage.local";

Reply via email to