This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 68e09fa1ba9717127fb9ac9e79da8ae3ff0d92e5 Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Jul 17 16:01:27 2024 +0200 MINIFICPP-2428 Add targetUris field support for JSON flow config Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1842 --- .../tests/unit/FlowJsonTests.cpp | 106 +++++++++++++++++++++ libminifi/src/core/flow/FlowSchema.cpp | 2 +- .../src/core/flow/StructuredConfiguration.cpp | 14 ++- 3 files changed, 119 insertions(+), 3 deletions(-) diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp index cdc0cfe9b..c8387386d 100644 --- a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp +++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp @@ -922,4 +922,110 @@ TEST_CASE("Test sensitive parameters in sensitive property value sequence") { CHECK(values[1] == "value2"); } +TEST_CASE("NiFi flow json can use alternative targetUris field") { + ConfigurationTestController test_controller; + + core::flow::AdaptiveConfiguration config(test_controller.getContext()); + bool target_uri_valid = true; + + std::string CONFIG_JSON; + SECTION("Use targetUris as a single value") { + CONFIG_JSON = + R"( +{ + "rootGroup": { + "name": "MiNiFi Flow", + "processors": [], + "funnels": [], + "connections": [], + "remoteProcessGroups": [{ + "name": "NiFi Flow", + "targetUris": "https://localhost:8090/nifi", + "yieldDuration": "6 sec", + "communicationsTimeout": "19 sec", + "inputPorts": [{ + "identifier": "00000000-0000-0000-0000-000000000003", + "name": "AmazingInputPort", + "targetId": "00000000-0000-0000-0000-000000000005", + "concurrentlySchedulableTaskCount": 7 + }] + }], + "parameterContextName": "my-context" + } +})"; + } + + SECTION("Use targetUris as an array") { + CONFIG_JSON = + R"( +{ + "rootGroup": { + "name": "MiNiFi Flow", + "processors": [], + "funnels": [], + "connections": [], + "remoteProcessGroups": [{ + "name": "NiFi Flow", + "targetUris": ["https://localhost:8090/nifi", "https://notused:1234/nifi"], + "yieldDuration": "6 sec", + "communicationsTimeout": "19 sec", + "inputPorts": [{ + "identifier": "00000000-0000-0000-0000-000000000003", + "name": "AmazingInputPort", + "targetId": "00000000-0000-0000-0000-000000000005", + "concurrentlySchedulableTaskCount": 7 + }] + }], + "parameterContextName": "my-context" + } +})"; + } + + SECTION("Use targetUris as an empty array") { + target_uri_valid = false; + CONFIG_JSON = + R"( +{ + "rootGroup": { + "name": "MiNiFi Flow", + "processors": [], + "funnels": [], + "connections": [], + "remoteProcessGroups": [{ + "name": "NiFi Flow", + "targetUris": [], + "yieldDuration": "6 sec", + "communicationsTimeout": "19 sec", + "inputPorts": [{ + "identifier": "00000000-0000-0000-0000-000000000003", + "name": "AmazingInputPort", + "targetId": "00000000-0000-0000-0000-000000000005", + "concurrentlySchedulableTaskCount": 7 + }] + }], + "parameterContextName": "my-context" + } +})"; + } + + + std::unique_ptr<core::ProcessGroup> flow = config.getRootFromPayload(CONFIG_JSON); + REQUIRE(flow); + + // verify RPG input port + auto* port = dynamic_cast<minifi::RemoteProcessorGroupPort*>(flow->findProcessorByName("AmazingInputPort")); + REQUIRE(port); + REQUIRE(port->getUUIDStr() == "00000000-0000-0000-0000-000000000003"); + REQUIRE(port->getMaxConcurrentTasks() == 7); + if (target_uri_valid) { + REQUIRE(port->getInstances().size() == 1); + REQUIRE(port->getInstances().front().host_ == "localhost"); + REQUIRE(port->getInstances().front().port_ == 8090); + REQUIRE(port->getInstances().front().protocol_ == "https://"); + } else { + REQUIRE(port->getInstances().empty()); + } + REQUIRE(port->getProperty("Port UUID") == "00000000-0000-0000-0000-000000000005"); +} + } // namespace org::apache::nifi::minifi::test diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp index 9d378800e..b09b4f539 100644 --- a/libminifi/src/core/flow/FlowSchema.cpp +++ b/libminifi/src/core/flow/FlowSchema.cpp @@ -130,7 +130,7 @@ FlowSchema FlowSchema::getNiFiFlowJson() { .input_ports = {"inputPorts"}, .output_ports = {"outputPorts"}, - .rpg_url = {"targetUri"}, + .rpg_url = {"targetUris", "targetUri"}, .rpg_yield_period = {"yieldDuration"}, .rpg_timeout = {"communicationsTimeout"}, .rpg_local_network_interface = {"localNetworkInterface"}, diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index b58dbe6d5..954efb394 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -973,8 +973,7 @@ std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::s std::string StructuredConfiguration::getOptionalField(const Node& node, const std::vector<std::string>& field_name, const std::string& default_value, const std::string& info_message) { std::string infoMessage = info_message; - auto result = node[field_name]; - if (!result) { + auto logInfoMessage = [&]() { if (infoMessage.empty()) { // Build a helpful info message for the user to inform them that a default is being used infoMessage = "Using default value for optional field '" + utils::string::join(",", field_name) + "'"; @@ -986,9 +985,20 @@ std::string StructuredConfiguration::getOptionalField(const Node& node, const st infoMessage += default_value; } logger_->log_info("{}", infoMessage); + }; + auto result = node[field_name]; + if (!result) { + logInfoMessage(); return default_value; } + if (result.isSequence()) { + if (result.empty()) { + logInfoMessage(); + return default_value; + } + return (*result.begin()).getString().value(); + } return result.getString().value(); }
