fgerlits commented on code in PR #2088:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2088#discussion_r2740663299
##########
libminifi/src/utils/file/AssetManager.cpp:
##########
@@ -121,51 +121,74 @@ std::string AssetManager::hash() const {
nonstd::expected<void, std::string> AssetManager::sync(
const AssetLayout& layout,
- const std::function<nonstd::expected<std::vector<std::byte>,
std::string>(std::string_view /*url*/)>& fetch) {
+ const std::function<nonstd::expected<void, std::string>(std::string_view
/*url*/, const std::filesystem::path& /*tmp_path*/)>& fetch) {
logger_->log_info("Synchronizing assets");
std::lock_guard lock(mtx_);
AssetLayout new_state{
.digest = state_.digest,
.assets = {}
};
- std::string fetch_errors;
- std::vector<std::pair<std::filesystem::path, std::vector<std::byte>>>
new_file_contents;
+ std::string new_asset_errors;
+ std::vector<AssetDescription> new_assets;
for (auto& new_entry : layout.assets) {
if (std::find_if(state_.assets.begin(), state_.assets.end(), [&] (auto&
entry) {return entry.id == new_entry.id;}) == state_.assets.end()) {
logger_->log_info("Fetching asset (id = '{}', path = '{}') from {}",
new_entry.id, new_entry.path.string(), new_entry.url);
- if (auto data = fetch(new_entry.url)) {
- new_file_contents.emplace_back(new_entry.path, data.value());
+ if (auto status = fetch(new_entry.url, (root_ / new_entry.path).string()
+ ".part")) {
+ new_assets.emplace_back(new_entry);
new_state.assets.insert(new_entry);
} else {
- logger_->log_error("Failed to fetch asset (id = '{}', path = '{}')
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url,
data.error());
- fetch_errors += "Failed to fetch '" + new_entry.id + "' from '" +
new_entry.url + "': " + data.error() + "\n";
+ logger_->log_error("Failed to fetch asset (id = '{}', path = '{}')
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url,
status.error());
+ new_asset_errors += "Failed to fetch '" + new_entry.id + "' from '" +
new_entry.url + "': " + status.error() + "\n";
}
} else {
logger_->log_info("Asset (id = '{}', path = '{}') already exists",
new_entry.id, new_entry.path.string());
new_state.assets.insert(new_entry);
}
}
- if (fetch_errors.empty()) {
- new_state.digest = layout.digest;
- }
for (auto& old_entry : state_.assets) {
if (std::find_if(layout.assets.begin(), layout.assets.end(), [&] (auto&
entry) {return entry.id == old_entry.id;}) == layout.assets.end()) {
logger_->log_info("We no longer need asset (id = '{}', path = '{}')",
old_entry.id, old_entry.path.string());
- std::filesystem::remove(root_ / old_entry.path);
+ std::error_code ec;
+ std::filesystem::remove(root_ / old_entry.path, ec);
+ if (ec) {
+ logger_->log_error("Failed to delete obsolete asset (id = '{}', path =
'{}')", old_entry.id, old_entry.path.string());
+ } else {
+ logger_->log_info("Successfully deleted obsolete asset (id = '{}',
path = '{}')", old_entry.id, old_entry.path.string());
+ }
}
}
- for (auto& [path, content] : new_file_contents) {
- create_dir((root_ / path).parent_path());
- std::ofstream{root_ / path, std::ios::binary}.write(reinterpret_cast<const
char*>(content.data()), gsl::narrow<std::streamsize>(content.size()));
+ for (auto& asset : new_assets) {
+ auto full_path = root_ / asset.path;
+ if (utils::file::create_dir(full_path.parent_path()) != 0) {
+ logger_->log_error("Failed to create asset directory '{}'",
full_path.parent_path());
+ new_asset_errors += fmt::format("Failed to create asset directory '{}'",
full_path.parent_path()) + "\n";
+ new_state.assets.erase(asset);
Review Comment:
Can this happen? `fetch()` has already created this directory when it wrote
the `.part` file, and if the directory creation failed, then `fetch()` failed
##########
C2.md:
##########
@@ -115,6 +115,12 @@ be requested via C2 DESCRIBE manifest command.
# specify the maximum number of bulletins to send in a heartbeat
# nifi.c2.flow.info.processor.bulletin.limit=1000
+ # Specify timeout for asset download operations. The entire download must
+ # finish in the specified amount of time. There is a separate fixed 30
second
+ # timeout from the last received data packet.
+ # setting to 0 disables the timeout (default)
+ nifi.c2.asset.download.timeout=0s
Review Comment:
I think we should add this to the default `conf/minifi.properties.in` file,
too.
##########
libminifi/src/utils/file/AssetManager.cpp:
##########
@@ -121,51 +121,74 @@ std::string AssetManager::hash() const {
nonstd::expected<void, std::string> AssetManager::sync(
const AssetLayout& layout,
- const std::function<nonstd::expected<std::vector<std::byte>,
std::string>(std::string_view /*url*/)>& fetch) {
+ const std::function<nonstd::expected<void, std::string>(std::string_view
/*url*/, const std::filesystem::path& /*tmp_path*/)>& fetch) {
logger_->log_info("Synchronizing assets");
std::lock_guard lock(mtx_);
AssetLayout new_state{
.digest = state_.digest,
.assets = {}
};
- std::string fetch_errors;
- std::vector<std::pair<std::filesystem::path, std::vector<std::byte>>>
new_file_contents;
+ std::string new_asset_errors;
+ std::vector<AssetDescription> new_assets;
for (auto& new_entry : layout.assets) {
if (std::find_if(state_.assets.begin(), state_.assets.end(), [&] (auto&
entry) {return entry.id == new_entry.id;}) == state_.assets.end()) {
logger_->log_info("Fetching asset (id = '{}', path = '{}') from {}",
new_entry.id, new_entry.path.string(), new_entry.url);
- if (auto data = fetch(new_entry.url)) {
- new_file_contents.emplace_back(new_entry.path, data.value());
+ if (auto status = fetch(new_entry.url, (root_ / new_entry.path).string()
+ ".part")) {
+ new_assets.emplace_back(new_entry);
new_state.assets.insert(new_entry);
} else {
- logger_->log_error("Failed to fetch asset (id = '{}', path = '{}')
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url,
data.error());
- fetch_errors += "Failed to fetch '" + new_entry.id + "' from '" +
new_entry.url + "': " + data.error() + "\n";
+ logger_->log_error("Failed to fetch asset (id = '{}', path = '{}')
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url,
status.error());
+ new_asset_errors += "Failed to fetch '" + new_entry.id + "' from '" +
new_entry.url + "': " + status.error() + "\n";
}
} else {
logger_->log_info("Asset (id = '{}', path = '{}') already exists",
new_entry.id, new_entry.path.string());
new_state.assets.insert(new_entry);
}
}
- if (fetch_errors.empty()) {
- new_state.digest = layout.digest;
- }
for (auto& old_entry : state_.assets) {
if (std::find_if(layout.assets.begin(), layout.assets.end(), [&] (auto&
entry) {return entry.id == old_entry.id;}) == layout.assets.end()) {
logger_->log_info("We no longer need asset (id = '{}', path = '{}')",
old_entry.id, old_entry.path.string());
- std::filesystem::remove(root_ / old_entry.path);
+ std::error_code ec;
+ std::filesystem::remove(root_ / old_entry.path, ec);
+ if (ec) {
+ logger_->log_error("Failed to delete obsolete asset (id = '{}', path =
'{}')", old_entry.id, old_entry.path.string());
+ } else {
+ logger_->log_info("Successfully deleted obsolete asset (id = '{}',
path = '{}')", old_entry.id, old_entry.path.string());
+ }
}
}
- for (auto& [path, content] : new_file_contents) {
- create_dir((root_ / path).parent_path());
- std::ofstream{root_ / path, std::ios::binary}.write(reinterpret_cast<const
char*>(content.data()), gsl::narrow<std::streamsize>(content.size()));
+ for (auto& asset : new_assets) {
+ auto full_path = root_ / asset.path;
+ if (utils::file::create_dir(full_path.parent_path()) != 0) {
+ logger_->log_error("Failed to create asset directory '{}'",
full_path.parent_path());
+ new_asset_errors += fmt::format("Failed to create asset directory '{}'",
full_path.parent_path()) + "\n";
+ new_state.assets.erase(asset);
+ } else {
+ std::error_code ec;
+ std::filesystem::rename(full_path.string() + ".part", full_path, ec);
+ if (ec) {
+ logger_->log_error("Failed to move temporary asset file '{}' to '{}'",
full_path.string() + ".part", full_path.string());
+ new_asset_errors += fmt::format("Failed to move temporary asset file
'{}' to '{}'", full_path.string() + ".part", full_path.string()) + "\n";
+ new_state.assets.erase(asset);
+ } else {
+ logger_->log_info("Successfully moved temporary asset file '{}' to
'{}'", full_path.string() + ".part", full_path.string());
Review Comment:
Nitpick, but I would change this to
```suggestion
logger_->log_info("Successfully downloaded asset to file '{}'",
full_path.string());
```
since the existence of the `.part` file is an implementation detail.
Alternatively, there could be two separate logs, one in `fetchAssetAsFile()`
saying that it has successfully created the `.part` file, and the one here
about moving it, as it is. But I don't think we need to mention the `.part`
file in the logs if everything goes well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]