adamdebreceni commented on code in PR #2088:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2088#discussion_r2740884123


##########
libminifi/src/utils/file/AssetManager.cpp:
##########
@@ -121,51 +121,74 @@ std::string AssetManager::hash() const {
 
 nonstd::expected<void, std::string> AssetManager::sync(
     const AssetLayout& layout,
-    const std::function<nonstd::expected<std::vector<std::byte>, 
std::string>(std::string_view /*url*/)>& fetch) {
+    const std::function<nonstd::expected<void, std::string>(std::string_view 
/*url*/, const std::filesystem::path& /*tmp_path*/)>& fetch) {
   logger_->log_info("Synchronizing assets");
   std::lock_guard lock(mtx_);
   AssetLayout new_state{
     .digest = state_.digest,
     .assets = {}
   };
-  std::string fetch_errors;
-  std::vector<std::pair<std::filesystem::path, std::vector<std::byte>>> 
new_file_contents;
+  std::string new_asset_errors;
+  std::vector<AssetDescription> new_assets;
   for (auto& new_entry : layout.assets) {
     if (std::find_if(state_.assets.begin(), state_.assets.end(), [&] (auto& 
entry) {return entry.id == new_entry.id;}) == state_.assets.end()) {
       logger_->log_info("Fetching asset (id = '{}', path = '{}') from {}", 
new_entry.id, new_entry.path.string(), new_entry.url);
-      if (auto data = fetch(new_entry.url)) {
-        new_file_contents.emplace_back(new_entry.path, data.value());
+      if (auto status = fetch(new_entry.url, (root_ / new_entry.path).string() 
+ ".part")) {
+        new_assets.emplace_back(new_entry);
         new_state.assets.insert(new_entry);
       } else {
-        logger_->log_error("Failed to fetch asset (id = '{}', path = '{}') 
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url, 
data.error());
-        fetch_errors += "Failed to fetch '" + new_entry.id + "' from '" + 
new_entry.url + "': " + data.error() + "\n";
+        logger_->log_error("Failed to fetch asset (id = '{}', path = '{}') 
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url, 
status.error());
+        new_asset_errors += "Failed to fetch '" + new_entry.id + "' from '" + 
new_entry.url + "': " + status.error() + "\n";
       }
     } else {
       logger_->log_info("Asset (id = '{}', path = '{}') already exists", 
new_entry.id, new_entry.path.string());
       new_state.assets.insert(new_entry);
     }
   }
-  if (fetch_errors.empty()) {
-    new_state.digest = layout.digest;
-  }
 
   for (auto& old_entry : state_.assets) {
     if (std::find_if(layout.assets.begin(), layout.assets.end(), [&] (auto& 
entry) {return entry.id == old_entry.id;}) == layout.assets.end()) {
       logger_->log_info("We no longer need asset (id = '{}', path = '{}')", 
old_entry.id, old_entry.path.string());
-      std::filesystem::remove(root_ / old_entry.path);
+      std::error_code ec;
+      std::filesystem::remove(root_ / old_entry.path, ec);
+      if (ec) {
+        logger_->log_error("Failed to delete obsolete asset (id = '{}', path = 
'{}')", old_entry.id, old_entry.path.string());
+      } else {
+        logger_->log_info("Successfully deleted obsolete asset (id = '{}', 
path = '{}')", old_entry.id, old_entry.path.string());
+      }
     }
   }
 
-  for (auto& [path, content] : new_file_contents) {
-    create_dir((root_ / path).parent_path());
-    std::ofstream{root_ / path, std::ios::binary}.write(reinterpret_cast<const 
char*>(content.data()), gsl::narrow<std::streamsize>(content.size()));
+  for (auto& asset : new_assets) {
+    auto full_path = root_ / asset.path;
+    if (utils::file::create_dir(full_path.parent_path()) != 0) {
+      logger_->log_error("Failed to create asset directory '{}'", 
full_path.parent_path());
+      new_asset_errors += fmt::format("Failed to create asset directory '{}'", 
full_path.parent_path()) + "\n";
+      new_state.assets.erase(asset);

Review Comment:
   you are right, I can't really imagine a scenario when this could happen, 
removed



##########
C2.md:
##########
@@ -115,6 +115,12 @@ be requested via C2 DESCRIBE manifest command.
     # specify the maximum number of bulletins to send in a heartbeat
     # nifi.c2.flow.info.processor.bulletin.limit=1000
 
+    # Specify timeout for asset download operations. The entire download must
+    # finish in the specified amount of time. There is a separate fixed 30 
second
+    # timeout from the last received data packet.
+    # setting to 0 disables the timeout (default)
+    nifi.c2.asset.download.timeout=0s

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to