This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 03eb5df503e89c0e4ed4e88ac71ed9fda83f3af4 Author: Adam Debreceni <[email protected]> AuthorDate: Wed May 7 11:01:38 2025 +0200 MINIFICPP-2564 - Resolve asset path references during flow parsing Signed-off-by: Ferenc Gerlits <[email protected]> Closes #1964 --- libminifi/include/core/FlowConfiguration.h | 3 + libminifi/include/core/ReferenceParser.h | 135 +++++++++++++++++++++ .../include/core/state/nodes/SupportedOperations.h | 2 +- libminifi/include/utils/file/AssetManager.h | 2 + libminifi/src/core/FlowConfiguration.cpp | 1 + .../src/core/flow/StructuredConfiguration.cpp | 16 +++ .../src/core/state/nodes/SupportedOperations.cpp | 35 ++---- libminifi/src/utils/file/AssetManager.cpp | 10 ++ libminifi/test/unit/AssetResolutionTests.cpp | 88 ++++++++++++++ minifi-api/include/minifi-cpp/Exception.h | 3 +- minifi_main/MiNiFiMain.cpp | 7 +- 11 files changed, 273 insertions(+), 29 deletions(-) diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index a85af2c61..331856397 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -39,6 +39,7 @@ #include "core/ProcessSession.h" #include "core/ProcessGroup.h" #include "core/state/nodes/FlowInformation.h" +#include "utils/file/AssetManager.h" #include "utils/file/FileSystem.h" #include "utils/ChecksumCalculator.h" #include "ParameterContext.h" @@ -61,6 +62,7 @@ struct ConfigurationContext { std::optional<std::filesystem::path> path{std::nullopt}; std::shared_ptr<utils::file::FileSystem> filesystem{std::make_shared<utils::file::FileSystem>()}; std::optional<utils::crypto::EncryptionProvider> sensitive_values_encryptor{std::nullopt}; + utils::file::AssetManager* asset_manager{nullptr}; }; enum class FlowSerializationType { Json, NifiJson, Yaml }; @@ -152,6 +154,7 @@ class FlowConfiguration : public CoreComponentImpl { std::shared_ptr<utils::file::FileSystem> filesystem_; utils::crypto::EncryptionProvider sensitive_values_encryptor_; utils::ChecksumCalculator checksum_calculator_; + utils::file::AssetManager* asset_manager_{nullptr}; private: virtual std::string serialize(const ProcessGroup&) { return ""; } diff --git a/libminifi/include/core/ReferenceParser.h b/libminifi/include/core/ReferenceParser.h new file mode 100644 index 000000000..95f498289 --- /dev/null +++ b/libminifi/include/core/ReferenceParser.h @@ -0,0 +1,135 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "minifi-cpp/Exception.h" +#include "utils/file/AssetManager.h" + +namespace org::apache::nifi::minifi::core { + +using IdResolver = std::function<std::optional<std::string>(std::string_view /* category */, std::string_view /* value */)>; + +class MalformedReferenceException : public Exception { + public: + explicit MalformedReferenceException(const std::string &message) : Exception(ExceptionType::FLOW_EXCEPTION, message) {} + + explicit MalformedReferenceException(const char *message) : Exception(ExceptionType::FLOW_EXCEPTION, message) {} +}; + +class UnknownCategoryException : public Exception { + public: + explicit UnknownCategoryException(const std::string &message) : Exception(ExceptionType::FLOW_EXCEPTION, message) {} + + explicit UnknownCategoryException(const char *message) : Exception(ExceptionType::FLOW_EXCEPTION, message) {} +}; + +std::string resolveIdentifier(std::string_view str, std::vector<IdResolver> resolvers) { + std::string result; + enum class ParseState { + OutsideToken, + InAtMark, + InToken, + InId + } state{ParseState::OutsideToken}; + std::string category; + std::string id; + for (size_t offset = 0; offset < str.length(); ++offset) { + if (state == ParseState::OutsideToken) { + if (str[offset] == '@') { + state = ParseState::InAtMark; + continue; + } + result += str[offset]; + } else if (state == ParseState::InAtMark) { + if (str[offset] == '@') { + result += '@'; + state = ParseState::OutsideToken; + continue; + } + if (str[offset] == '{') { + state = ParseState::InToken; + category.clear(); + id.clear(); + continue; + } + throw MalformedReferenceException(fmt::format("Malformed reference at {} in '{}'", offset, str)); + } else if (state == ParseState::InToken) { + if (str[offset] == ':') { + state = ParseState::InId; + continue; + } + category += str[offset]; + } else if (state == ParseState::InId) { + if (str[offset] == '}') { + std::optional <std::string> replacement; + for (auto &resolver: resolvers) { + replacement = resolver(category, id); + if (replacement) { + break; + } + } + if (!replacement) { + throw UnknownCategoryException(fmt::format("Could not resolve '{}' in category '{}'", id, category)); + } + result += replacement.value(); + state = ParseState::OutsideToken; + continue; + } + id += str[offset]; + } + } + if (state != ParseState::OutsideToken) { + throw MalformedReferenceException(fmt::format("Found invalid asset reference expression in '{}'", str)); + } + return result; +} + +class AssetException : public Exception { + public: + explicit AssetException(const std::string &message) : Exception(ExceptionType::ASSET_EXCEPTION, message) {} + + explicit AssetException(const char *message) : Exception(ExceptionType::ASSET_EXCEPTION, message) {} +}; + +IdResolver getAssetResolver(std::function<std::optional<std::filesystem::path>(std::string_view)> find_asset) { + return [find_asset] (std::string_view category, std::string_view id) -> std::optional<std::string> { + if (category != "asset-id") { + return std::nullopt; + } + if (!find_asset) { + throw AssetException(fmt::format("Asset manager is not available, asset path resolution of '{}' is not supported", id)); + } + auto path = find_asset(id); + if (!path) { + throw AssetException(fmt::format("Failed to find asset @{{asset-id:{}}}", id)); + } + return path.value().string(); + }; +} + +IdResolver getAssetResolver(utils::file::AssetManager* asset_manager) { + if (!asset_manager) { + return getAssetResolver(std::function<std::optional<std::filesystem::path>(std::string_view)>{nullptr}); + } + return getAssetResolver([asset_manager] (std::string_view id) { + return asset_manager->findAssetById(id); + }); +} + +} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/include/core/state/nodes/SupportedOperations.h b/libminifi/include/core/state/nodes/SupportedOperations.h index 1eaff2a9e..f92cc4608 100644 --- a/libminifi/include/core/state/nodes/SupportedOperations.h +++ b/libminifi/include/core/state/nodes/SupportedOperations.h @@ -50,7 +50,7 @@ class SupportedOperations : public DeviceInformation { } private: - using Metadata = std::unordered_map<std::string, std::vector<std::unordered_map<std::string, std::string>>>; + using Metadata = std::vector<SerializedResponseNode>; template<typename T> static void serializeProperty(SerializedResponseNode& properties, const std::unordered_map<std::string, Metadata>& operand_with_metadata = {}) { diff --git a/libminifi/include/utils/file/AssetManager.h b/libminifi/include/utils/file/AssetManager.h index 93424a145..d7f8c95e2 100644 --- a/libminifi/include/utils/file/AssetManager.h +++ b/libminifi/include/utils/file/AssetManager.h @@ -59,6 +59,8 @@ class AssetManager { std::filesystem::path getRoot() const; + std::optional<std::filesystem::path> findAssetById(std::string_view id) const; + private: void refreshState(); diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 807102c96..effcbb872 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -37,6 +37,7 @@ FlowConfiguration::FlowConfiguration(ConfigurationContext ctx) service_provider_(std::make_shared<core::controller::StandardControllerServiceProvider>(std::make_unique<core::controller::ControllerServiceNodeMap>(), configuration_)), filesystem_(std::move(ctx.filesystem)), sensitive_values_encryptor_(std::move(ctx.sensitive_values_encryptor.value())), + asset_manager_(ctx.asset_manager), logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) { std::string flowUrl; std::string bucket_id = "default"; diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index b94624a20..c12d6e0dc 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -25,6 +25,7 @@ #include "Funnel.h" #include "core/ParameterContext.h" #include "core/ParameterTokenParser.h" +#include "core/ReferenceParser.h" #include "core/flow/CheckRequiredField.h" #include "core/flow/StructuredConnectionParser.h" #include "core/state/Value.h" @@ -761,6 +762,10 @@ void StructuredConfiguration::parseRPGPort(const Node& port_node, core::ProcessG } } +static std::string resolveAssetReferences(std::string_view str, utils::file::AssetManager *asset_manager) { + return resolveIdentifier(str, {getAssetResolver(asset_manager)}); +} + void StructuredConfiguration::parsePropertyValueSequence(const std::string& property_name, const Node& property_value_node, core::ConfigurableComponent& component, ParameterContext* parameter_context) { const bool is_sensitive = component.getSupportedProperty(property_name) @@ -786,6 +791,13 @@ void StructuredConfiguration::parsePropertyValueSequence(const std::string& prop throw; } + try { + rawValueString = resolveAssetReferences(rawValueString, asset_manager_); + } catch (const AssetException& e) { + logger_->log_error("Error while resolving asset in property '{}': {}", property_name, e.what()); + throw; + } + logger_->log_debug("Found property {}", property_name); const auto append_prop_result = component.appendProperty(property_name, rawValueString); @@ -825,6 +837,7 @@ std::optional<std::string> StructuredConfiguration::getReplacedParametersValueOr token_parser = std::make_unique<core::NonSensitiveParameterTokenParser>(std::move(property_value_string)); } auto replaced_property_value_string = token_parser->replaceParameters(parameter_context); + replaced_property_value_string = resolveAssetReferences(replaced_property_value_string, asset_manager_); return replaced_property_value_string; } catch (const utils::crypto::EncryptionError& e) { logger_->log_error("Fetching property failed with a decryption error: {}", e.what()); @@ -832,6 +845,9 @@ std::optional<std::string> StructuredConfiguration::getReplacedParametersValueOr } catch (const ParameterException& e) { logger_->log_error("Error while substituting parameters in property '{}': {}", property_name, e.what()); throw; + } catch (const AssetException& e) { + logger_->log_error("Error while resolving asset in property '{}': {}", property_name, e.what()); + throw; } catch (const std::exception& e) { logger_->log_error("Fetching property failed with an exception of {}", e.what()); logger_->log_error("Invalid conversion for field {}. Value {}", property_name, property_value_node.getDebugString()); diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp b/libminifi/src/core/state/nodes/SupportedOperations.cpp index 915d36ca5..302ad81d6 100644 --- a/libminifi/src/core/state/nodes/SupportedOperations.cpp +++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp @@ -39,24 +39,11 @@ std::string SupportedOperations::getName() const { } void SupportedOperations::addProperty(SerializedResponseNode& properties, const std::string& operand, const Metadata& metadata) { - SerializedResponseNode operand_node{.name = operand, .keep_empty = true}; - - for (const auto& [key, value_array] : metadata) { - SerializedResponseNode metadata_item{.name = key, .array = true}; - for (const auto& value_object : value_array) { - SerializedResponseNode value_child; - for (const auto& pair: value_object) { - value_child.children.push_back({.name = pair.first, .value = pair.second}); - } - metadata_item.children.push_back(value_child); - } - operand_node.children.push_back(metadata_item); - } - properties.children.push_back(operand_node); + properties.children.push_back(SerializedResponseNode{.name = operand, .keep_empty = true, .children = metadata}); } SupportedOperations::Metadata SupportedOperations::buildUpdatePropertiesMetadata() const { - std::vector<std::unordered_map<std::string, std::string>> supported_config_updates; + SerializedResponseNode supported_config_updates{.name = "availableProperties", .array = true}; auto sensitive_properties = Configuration::getSensitiveProperties(configuration_reader_); auto updatable_not_sensitive_configuration_properties = minifi::Configuration::CONFIGURATION_PROPERTIES | ranges::views::filter([&](const auto& configuration_property) { const auto& configuration_property_name = configuration_property.first; @@ -65,19 +52,17 @@ SupportedOperations::Metadata SupportedOperations::buildUpdatePropertiesMetadata }); for (const auto& [config_property_name, config_property_validator] : updatable_not_sensitive_configuration_properties) { - std::unordered_map<std::string, std::string> property; - property.emplace("propertyName", config_property_name); - property.emplace("validator", config_property_validator->getEquivalentNifiStandardValidatorName().value_or("VALID")); + SerializedResponseNode property; + property.children.push_back({.name = "propertyName", .value = std::string{config_property_name}}); + property.children.push_back({.name = "validator", .value = std::string{config_property_validator->getEquivalentNifiStandardValidatorName().value_or("VALID")}}); if (configuration_reader_) { if (auto property_value = configuration_reader_(std::string(config_property_name))) { - property.emplace("propertyValue", *property_value); + property.children.push_back({.name = "propertyValue", .value = *property_value}); } } - supported_config_updates.push_back(property); + supported_config_updates.children.push_back(property); } - Metadata available_properties; - available_properties.emplace("availableProperties", supported_config_updates); - return available_properties; + return {supported_config_updates}; } void SupportedOperations::fillProperties(SerializedResponseNode& properties, minifi::c2::Operation operation) const { @@ -111,7 +96,9 @@ void SupportedOperations::fillProperties(SerializedResponseNode& properties, min break; } case minifi::c2::Operation::sync: { - serializeProperty<minifi::c2::SyncOperand>(properties); + std::unordered_map<std::string, Metadata> operand_with_metadata; + operand_with_metadata.emplace("resource", Metadata{SerializedResponseNode{.name = "resolveAssetReferences", .value = true}}); + serializeProperty<minifi::c2::SyncOperand>(properties, operand_with_metadata); break; } default: diff --git a/libminifi/src/utils/file/AssetManager.cpp b/libminifi/src/utils/file/AssetManager.cpp index 3444cb55c..73f052de6 100644 --- a/libminifi/src/utils/file/AssetManager.cpp +++ b/libminifi/src/utils/file/AssetManager.cpp @@ -187,4 +187,14 @@ std::filesystem::path AssetManager::getRoot() const { return root_; } +std::optional<std::filesystem::path> AssetManager::findAssetById(std::string_view id) const { + std::lock_guard lock(mtx_); + for (auto& asset : state_.assets) { + if (asset.id == id) { + return root_ / asset.path; + } + } + return std::nullopt; +} + } // namespace org::apache::nifi::minifi::utils::file diff --git a/libminifi/test/unit/AssetResolutionTests.cpp b/libminifi/test/unit/AssetResolutionTests.cpp new file mode 100644 index 000000000..2cf2891bb --- /dev/null +++ b/libminifi/test/unit/AssetResolutionTests.cpp @@ -0,0 +1,88 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "core/ReferenceParser.h" + +namespace org::apache::nifi::minifi::test { + +static std::string resolve(const std::string& str) { + return core::resolveIdentifier(str, {core::getAssetResolver([] (std::string_view id) -> std::optional<std::filesystem::path> { + if (id == "apple") { + return "/home/user/apple.txt"; + } + if (id == "banana") { + return "/home/user/banana.txt"; + } + if (id == "471deef6-2a6e-4a7d-912a-81cc17e3a204") { + return "/home/ai_model.txt"; + } + return std::nullopt; + })}); +} + +TEST_CASE("Non-references are preserved") { + const std::string str = "property #{this} other {{nothing}} else"; + const auto result = resolve(str); + REQUIRE(result == str); +} + +TEST_CASE("Can escape '@'") { + REQUIRE(resolve("property @@{this} other {{nothing}} else@@") == "property @{this} other {{nothing}} else@"); +} + +TEST_CASE("Throw on unterminated pattern") { + REQUIRE_THROWS_AS(resolve("something @"), core::MalformedReferenceException); + REQUIRE_THROWS_AS(resolve("something @{"), core::MalformedReferenceException); + REQUIRE_THROWS_AS(resolve("something @{}"), core::MalformedReferenceException); + REQUIRE_THROWS_AS(resolve("something @{:"), core::MalformedReferenceException); +} + +TEST_CASE("Resolve existing asset") { + REQUIRE(resolve("something @{asset-id:apple} @{asset-id:banana}") == "something /home/user/apple.txt /home/user/banana.txt"); + REQUIRE(resolve("@{asset-id:apple}") == "/home/user/apple.txt"); + REQUIRE(resolve("@{asset-id:471deef6-2a6e-4a7d-912a-81cc17e3a204}") == "/home/ai_model.txt"); +} + +TEST_CASE("Throw on non-existing asset") { + REQUIRE_THROWS_AS(resolve("something @{asset-id:kiwi}"), core::AssetException); +} + +TEST_CASE("Throw on non-existing category") { + REQUIRE_THROWS_AS(resolve("something @{drink:cider}"), core::UnknownCategoryException); +} + +TEST_CASE("Resolve more categories") { + auto result = core::resolveIdentifier("@{drink:cider} @{asset-id:apple}", {core::getAssetResolver([] (std::string_view id) -> std::optional<std::filesystem::path> { + if (id == "apple") { + return "/home/user/apple.txt"; + } + return std::nullopt; + }), [] (std::string_view category, std::string_view id) -> std::optional<std::string> { + if (category != "drink") { + return std::nullopt; + } + if (id == "cider") { + return "yes please"; + } + return std::nullopt; + }}); + REQUIRE(result == "yes please /home/user/apple.txt"); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/minifi-api/include/minifi-cpp/Exception.h b/minifi-api/include/minifi-cpp/Exception.h index 77756272f..70fe0a885 100644 --- a/minifi-api/include/minifi-cpp/Exception.h +++ b/minifi-api/include/minifi-cpp/Exception.h @@ -43,11 +43,12 @@ enum ExceptionType { REGEX_EXCEPTION, REPOSITORY_EXCEPTION, PARAMETER_EXCEPTION, + ASSET_EXCEPTION, MAX_EXCEPTION }; static const char *ExceptionStr[MAX_EXCEPTION] = { "File Operation", "Flow File Operation", "Processor Operation", "Process Session Operation", "Process Schedule Operation", "Site2Site Protocol", - "General Operation", "Regex Operation", "Repository Operation", "Parameter Operation" }; + "General Operation", "Regex Operation", "Repository Operation", "Parameter Operation", "Asset Operation" }; inline const char *ExceptionTypeToString(ExceptionType type) { if (type < MAX_EXCEPTION) diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index facde3b03..effcc6681 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -394,6 +394,8 @@ int main(int argc, char **argv) { should_encrypt_flow_config, utils::crypto::EncryptionProvider::create(minifiHome)); + auto asset_manager = std::make_unique<utils::file::AssetManager>(*configure); + std::shared_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration( core::ConfigurationContext{ .flow_file_repo = flow_repo, @@ -401,11 +403,10 @@ int main(int argc, char **argv) { .configuration = configure, .path = configure->get(minifi::Configure::nifi_flow_configuration_file), .filesystem = filesystem, - .sensitive_values_encryptor = utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome) + .sensitive_values_encryptor = utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome), + .asset_manager = asset_manager.get() }, nifi_configuration_class_name); - auto asset_manager = std::make_unique<utils::file::AssetManager>(*configure); - std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{prov_repo, flow_repo, content_repo}; auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configure, repo_metric_sources, flow_configuration, asset_manager.get()); const auto controller = std::make_unique<minifi::FlowController>(
