This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit ce288b920d2ac6dd1a445524b81aaf88baa501a9 Author: Adam Debreceni <[email protected]> AuthorDate: Fri Feb 17 01:09:38 2023 +0100 MINIFICPP-2035 NiFi flow json format support Closes #1494 Signed-off-by: Marton Szasz <[email protected]> --- .../integration/MiNiFi_integration_test_driver.py | 3 + docker/test/integration/cluster/ContainerStore.py | 3 + .../test/integration/cluster/DockerTestCluster.py | 3 + .../cluster/containers/MinifiContainer.py | 9 +- docker/test/integration/features/s2s.feature | 7 +- docker/test/integration/features/script.feature | 3 +- docker/test/integration/features/sql.feature | 3 +- .../Minifi_flow_json_serializer.py | 141 +++++++++++++ docker/test/integration/steps/steps.py | 5 + extensions/http-curl/protocols/RESTSender.cpp | 1 + .../tests/unit/AdaptiveConfigurationTests.cpp | 66 ++++++ .../tests/unit/FlowJsonTests.cpp | 163 +++++++++++++++ .../tests/unit/YamlConfigurationTests.cpp | 153 +++----------- libminifi/CMakeLists.txt | 2 +- libminifi/include/RemoteProcessorGroupPort.h | 4 + libminifi/include/core/ProcessGroup.h | 2 - libminifi/include/core/ProcessorConfig.h | 6 +- .../AdaptiveConfiguration.h} | 31 +-- libminifi/include/core/flow/CheckRequiredField.h | 6 +- libminifi/include/core/flow/FlowSchema.h | 88 ++++++++ libminifi/include/core/flow/Node.h | 59 +++++- .../include/core/flow/StructuredConfiguration.h | 28 ++- .../include/core/flow/StructuredConnectionParser.h | 10 +- libminifi/include/core/json/JsonNode.h | 19 ++ .../include/core/state/nodes/SchedulingNodes.h | 4 +- libminifi/include/core/yaml/YamlConfiguration.h | 9 - libminifi/include/core/yaml/YamlNode.h | 16 +- libminifi/src/core/ConfigurationFactory.cpp | 14 +- libminifi/src/core/ProcessGroup.cpp | 6 +- libminifi/src/core/flow/AdaptiveConfiguration.cpp | 67 ++++++ libminifi/src/core/flow/CheckRequiredField.cpp | 18 +- libminifi/src/core/flow/FlowSchema.cpp | 144 +++++++++++++ libminifi/src/core/flow/Node.cpp | 10 +- .../src/core/flow/StructuredConfiguration.cpp | 227 +++++++++++---------- .../src/core/flow/StructuredConnectionParser.cpp | 29 ++- libminifi/src/core/json/JsonConfiguration.cpp | 89 -------- libminifi/src/core/yaml/YamlConfiguration.cpp | 24 +-- libminifi/test/ConfigurationTestController.h | 56 +++++ minifi_main/MiNiFiMain.cpp | 2 +- 39 files changed, 1076 insertions(+), 454 deletions(-) diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py index e3153dffe..21dec70b4 100644 --- a/docker/test/integration/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/MiNiFi_integration_test_driver.py @@ -315,3 +315,6 @@ class MiNiFi_integration_test: def enable_sql_in_minifi(self): self.cluster.enable_sql_in_minifi() + + def set_yaml_in_minifi(self): + self.cluster.set_yaml_in_minifi() diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index 867e41076..fb5f9c6b8 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -160,6 +160,9 @@ class ContainerStore: def enable_sql_in_minifi(self): self.minifi_options.enable_sql = True + def set_yaml_in_minifi(self): + self.minifi_options.config_format = "yaml" + def get_startup_finished_log_entry(self, container_name): return self.containers[container_name].get_startup_finished_log_entry() diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 9688c732e..0a07119a5 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -85,6 +85,9 @@ class DockerTestCluster: def enable_sql_in_minifi(self): self.container_store.enable_sql_in_minifi() + def set_yaml_in_minifi(self): + self.container_store.set_yaml_in_minifi() + def get_app_log(self, container_name): log_source = self.container_store.log_source(container_name) if log_source == LogSource.FROM_DOCKER_CONTAINER: diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index 93d65ddda..7f70d4fb7 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -21,6 +21,7 @@ import shutil import copy from .FlowContainer import FlowContainer from minifi.flow_serialization.Minifi_flow_yaml_serializer import Minifi_flow_yaml_serializer +from minifi.flow_serialization.Minifi_flow_json_serializer import Minifi_flow_json_serializer class MinifiOptions: @@ -30,6 +31,7 @@ class MinifiOptions: self.enable_provenance = False self.enable_prometheus = False self.enable_sql = False + self.config_format = "json" class MinifiContainer(FlowContainer): @@ -58,7 +60,12 @@ class MinifiContainer(FlowContainer): return "Starting Flow Controller" def _create_config(self): - serializer = Minifi_flow_yaml_serializer() + if self.options.config_format == "yaml": + serializer = Minifi_flow_yaml_serializer() + elif self.options.config_format == "json": + serializer = Minifi_flow_json_serializer() + else: + assert False, "Invalid flow configuration format: {}".format(self.options.config_format) test_flow_yaml = serializer.serialize(self.start_nodes, self.controllers) logging.info('Using generated flow config yml:\n%s', test_flow_yaml) with open(os.path.join(self.container_specific_config_dir, "config.yml"), 'wb') as config_file: diff --git a/docker/test/integration/features/s2s.feature b/docker/test/integration/features/s2s.feature index a836a50c5..a51589a02 100644 --- a/docker/test/integration/features/s2s.feature +++ b/docker/test/integration/features/s2s.feature @@ -35,7 +35,8 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds Scenario: Zero length files are transfered between via s2s if the "drop empty" connection property is false - Given a GenerateFlowFile processor with the "File Size" property set to "0B" + Given a MiNiFi CPP server with yaml config + And a GenerateFlowFile processor with the "File Size" property set to "0B" And a RemoteProcessGroup node opened on "http://nifi:8080/nifi" And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup @@ -47,7 +48,9 @@ Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol Then at least one empty flowfile is placed in the monitored directory in less than 90 seconds Scenario: Zero length files are not transfered between via s2s if the "drop empty" connection property is true - Given a GenerateFlowFile processor with the "File Size" property set to "0B" + # "drop empty" is only supported with yaml config + Given a MiNiFi CPP server with yaml config + And a GenerateFlowFile processor with the "File Size" property set to "0B" And a RemoteProcessGroup node opened on "http://nifi:8080/nifi" And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup And the connection going to the RemoteProcessGroup has "drop empty" set diff --git a/docker/test/integration/features/script.feature b/docker/test/integration/features/script.feature index 4a31b3c42..b8ca2e5f6 100644 --- a/docker/test/integration/features/script.feature +++ b/docker/test/integration/features/script.feature @@ -29,7 +29,8 @@ Feature: MiNiFi can execute Lua and Python scripts Then the Minifi logs contain the following message: "Sleeping forever" 3 times after 5 seconds Scenario: ExecuteScript should only allow one Python script running at a time - Given a GenerateFlowFile processor with the "File Size" property set to "0B" + Given a MiNiFi CPP server with yaml config + And a GenerateFlowFile processor with the "File Size" property set to "0B" And the scheduling period of the GenerateFlowFile processor is set to "500 ms" And a ExecuteScript processor with the "Script File" property set to "/tmp/resources/python/sleep_forever.py" And the "Script Engine" property of the ExecuteScript processor is set to "python" diff --git a/docker/test/integration/features/sql.feature b/docker/test/integration/features/sql.feature index 768ed4d7e..bafab8e95 100644 --- a/docker/test/integration/features/sql.feature +++ b/docker/test/integration/features/sql.feature @@ -33,7 +33,8 @@ Feature: Executing SQL operations from MiNiFi-C++ Then the query "SELECT * FROM test_table WHERE int_col = 42" returns 1 rows in less than 120 seconds on the PostgreSQL server Scenario: A MiNiFi instance can query to test table with ExecuteSQL processor - Given a GenerateFlowFile processor with the "File Size" property set to "0B" + Given a MiNiFi CPP server with yaml config + And a GenerateFlowFile processor with the "File Size" property set to "0B" And a UpdateAttribute processor with the "sql.args.1.value" property set to "apple" And the "sql.args.2.value" property of the UpdateAttribute processor is set to "banana" And a ExecuteSQL processor with the "SQL select query" property set to "SELECT * FROM test_table WHERE text_col = ? OR text_col = ? ORDER BY int_col DESC" diff --git a/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py new file mode 100644 index 000000000..4468a6107 --- /dev/null +++ b/docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import uuid +import json + +from ..core.Processor import Processor +from ..core.InputPort import InputPort +from ..core.Funnel import Funnel + + +class Minifi_flow_json_serializer: + def serialize(self, start_nodes, controllers): + res = { + 'rootGroup': { + 'name': 'MiNiFi Flow', + 'processors': [], + 'funnels': [], + 'connections': [], + 'remoteProcessGroups': [], + 'controllerServices': [] + } + } + visited = [] + + for node in start_nodes: + self.serialize_node(node, res['rootGroup'], visited) + + for controller in controllers: + self.serialize_controller(controller, res['rootGroup']) + + return json.dumps(res) + + def serialize_node(self, connectable, root, visited): + visited.append(connectable) + + if hasattr(connectable, 'name'): + connectable_name = connectable.name + else: + connectable_name = str(connectable.uuid) + + if isinstance(connectable, InputPort): + group = connectable.remote_process_group + res_group = None + + for res_group_candidate in root['remoteProcessGroups']: + assert isinstance(res_group_candidate, dict) + if res_group_candidate['identifier'] == str(group.uuid): + res_group = res_group_candidate + + if res_group is None: + res_group = { + 'name': group.name, + 'identifier': str(group.uuid), + 'targetUri': group.url, + 'communicationsTimeout': '30 sec', + 'yieldDuration': '3 sec', + 'inputPorts': [] + } + + root['remoteProcessGroups'].append(res_group) + + res_group['inputPorts'].append({ + 'identifier': str(connectable.uuid), + 'name': connectable.name + }) + + if isinstance(connectable, Processor): + root['processors'].append({ + 'name': connectable_name, + 'identifier': str(connectable.uuid), + 'type': connectable.class_prefix + connectable.clazz, + 'schedulingStrategy': connectable.schedule['scheduling strategy'], + 'schedulingPeriod': connectable.schedule['scheduling period'], + 'penaltyDuration': connectable.schedule['penalization period'], + 'yieldDuration': connectable.schedule['yield period'], + 'runDurationMillis': connectable.schedule['run duration nanos'], + 'properties': connectable.properties, + 'autoTerminatedRelationships': connectable.auto_terminate, + 'concurrentlySchedulableTaskCount': connectable.max_concurrent_tasks + }) + + for svc in connectable.controller_services: + if svc in visited: + continue + + visited.append(svc) + root['controllerServices'].append({ + 'name': svc.name, + 'identifier': svc.id, + 'type': svc.service_class, + 'properties': svc.properties + }) + + if isinstance(connectable, Funnel): + root['funnels'].append({ + 'identifier': str(connectable.uuid) + }) + + for conn_name in connectable.connections: + conn_procs = connectable.connections[conn_name] + + if not isinstance(conn_procs, list): + conn_procs = [conn_procs] + + for proc in conn_procs: + root['connections'].append({ + 'name': str(uuid.uuid4()), + 'source': {'id': str(connectable.uuid)}, + 'destination': {'id': str(proc.uuid)} + }) + if (all(str(connectable.uuid) != x['identifier'] for x in root['funnels'])): + root['connections'][-1]['selectedRelationships'] = [conn_name] + if proc not in visited: + self.serialize_node(proc, root, visited) + + def serialize_controller(self, controller, root): + if hasattr(controller, 'name'): + connectable_name = controller.name + else: + connectable_name = str(controller.uuid) + + root['controllerServices'].append({ + 'name': connectable_name, + 'identifier': controller.id, + 'type': controller.service_class, + 'properties': controller.properties + }) diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py index 686c33ebe..cce5eaf30 100644 --- a/docker/test/integration/steps/steps.py +++ b/docker/test/integration/steps/steps.py @@ -1037,3 +1037,8 @@ def step_impl(context, size: str, duration: str) -> None: @then(u'the memory usage of the agent decreases to {peak_usage_percent}% peak usage in less than {duration}') def step_impl(context, peak_usage_percent: str, duration: str) -> None: context.test.check_memory_usage_compared_to_peak(float(peak_usage_percent) * 0.01, humanfriendly.parse_timespan(duration)) + + +@given(u'a MiNiFi CPP server with yaml config') +def step_impl(context): + context.test.set_yaml_in_minifi() diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp index b58bf3233..ad3036cbe 100644 --- a/extensions/http-curl/protocols/RESTSender.cpp +++ b/extensions/http-curl/protocols/RESTSender.cpp @@ -162,6 +162,7 @@ C2Payload RESTSender::sendPayload(const std::string& url, const Direction direct if (payload.getOperation() == Operation::TRANSFER) { auto read = std::make_unique<utils::HTTPReadCallback>(std::numeric_limits<size_t>::max()); client.setReadCallback(std::move(read)); + client.setRequestHeader("Accept", "application/vnd.minifi-c2+json;version=1, text/yml"); } else { // Due to a bug in MiNiFi C2 the Accept header is not handled properly thus we need to exclude it to be compatible // TODO(lordgamez): The header should be re-added when the issue in MiNiFi C2 is fixed: https://issues.apache.org/jira/browse/NIFI-10535 diff --git a/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp b/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp new file mode 100644 index 000000000..7bf8ddd27 --- /dev/null +++ b/extensions/standard-processors/tests/unit/AdaptiveConfigurationTests.cpp @@ -0,0 +1,66 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TestBase.h" +#include "Catch.h" +#include "ConfigurationTestController.h" +#include "core/flow/AdaptiveConfiguration.h" + +TEST_CASE("Adaptive configuration can parse JSON") { + ConfigurationTestController controller; + + const char* json_config = R"( + { + "Flow Controller": {"name": "root"}, + "Processors": [ + { + "id": "00000000-0000-0000-0000-000000000001", + "class": "DummyProcessor", + "name": "Proc1" + } + ], + "Connections": [] + } + )"; + + core::flow::AdaptiveConfiguration config{controller.getContext()}; + + auto root = config.getRootFromPayload(json_config); + + REQUIRE(root->findProcessorByName("Proc1")); +} + +TEST_CASE("Adaptive configuration can parse YAML") { + ConfigurationTestController controller; + + const char* yaml_config = R"( +Flow Controller: + name: root +Processors: +- id: 00000000-0000-0000-0000-000000000001 + class: DummyProcessor + name: Proc1 +Connections: [] + )"; + + core::flow::AdaptiveConfiguration config{controller.getContext()}; + + auto root = config.getRootFromPayload(yaml_config); + + REQUIRE(root->findProcessorByName("Proc1")); +} diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp new file mode 100644 index 000000000..b7d1a47f8 --- /dev/null +++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp @@ -0,0 +1,163 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <map> +#include <memory> +#include <chrono> +#include "core/repository/VolatileContentRepository.h" +#include "core/ProcessGroup.h" +#include "core/RepositoryFactory.h" +#include "core/yaml/YamlConfiguration.h" +#include "TailFile.h" +#include "TestBase.h" +#include "Catch.h" +#include "utils/TestUtils.h" +#include "utils/StringUtils.h" +#include "ConfigurationTestController.h" +#include "Funnel.h" + +using namespace std::literals::chrono_literals; + +TEST_CASE("NiFi flow json format is correctly parsed") { + ConfigurationTestController test_controller; + + core::flow::AdaptiveConfiguration config(test_controller.getContext()); + + static const std::string CONFIG_JSON = + R"( +{ + "rootGroup": { + "name": "MiNiFi Flow", + "processors": [{ + "identifier": "00000000-0000-0000-0000-000000000001", + "name": "MyGenFF", + "type": "org.apache.nifi.processors.standard.GenerateFlowFile", + "concurrentlySchedulableTaskCount": 15, + "schedulingStrategy": "TIMER_DRIVEN", + "schedulingPeriod": "3 sec", + "penaltyDuration": "12 sec", + "yieldDuration": "4 sec", + "runDurationMillis": 12, + "autoTerminatedRelationships": ["one", "two"], + "properties": { + "File Size": "10 B", + "Batch Size": 12 + } + }], + "funnels": [{ + "identifier": "00000000-0000-0000-0000-000000000010", + "name": "CoolFunnel" + }], + "connections": [{ + "identifier": "00000000-0000-0000-0000-000000000002", + "name": "GenToFunnel", + "source": { + "id": "00000000-0000-0000-0000-000000000001", + "name": "MyGenFF" + }, + "destination": { + "id": "00000000-0000-0000-0000-000000000010", + "name": "CoolFunnel" + }, + "selectedRelationships": ["a", "b"], + "backPressureObjectThreshold": 7, + "backPressureDataSizeThreshold": "11 KB", + "flowFileExpiration": "13 sec" + }, { + "identifier": "00000000-0000-0000-0000-000000000008", + "name": "FunnelToS2S", + "source": { + "id": "00000000-0000-0000-0000-000000000010", + "name": "CoolFunnel" + }, + "destination": { + "id": "00000000-0000-0000-0000-000000000003", + "name": "AmazingInputPort" + }, + "selectedRelationships": ["success"] + }], + "remoteProcessGroups": [{ + "name": "NiFi Flow", + "targetUri": "https://localhost:8090/nifi", + "yieldDuration": "6 sec", + "communicationsTimeout": "19 sec", + "inputPorts": [{ + "identifier": "00000000-0000-0000-0000-000000000003", + "name": "AmazingInputPort", + "targetId": "00000000-0000-0000-0000-000000000005", + "concurrentlySchedulableTaskCount": 7 + }] + }] + } +})"; + + std::unique_ptr<core::ProcessGroup> flow = config.getRootFromPayload(CONFIG_JSON); + REQUIRE(flow); + + // verify processor + auto* proc = flow->findProcessorByName("MyGenFF"); + REQUIRE(proc); + REQUIRE(proc->getUUIDStr() == "00000000-0000-0000-0000-000000000001"); + REQUIRE(15 == proc->getMaxConcurrentTasks()); + REQUIRE(core::SchedulingStrategy::TIMER_DRIVEN == proc->getSchedulingStrategy()); + REQUIRE(3s == proc->getSchedulingPeriodNano()); + REQUIRE(12s == proc->getPenalizationPeriod()); + REQUIRE(4s == proc->getYieldPeriodMsec()); + REQUIRE(proc->isAutoTerminated({"one", ""})); + REQUIRE(proc->isAutoTerminated({"two", ""})); + REQUIRE_FALSE(proc->isAutoTerminated({"three", ""})); + REQUIRE(proc->getProperty("File Size") == "10 B"); + REQUIRE(proc->getProperty("Batch Size") == "12"); + + // verify funnel + auto* funnel = dynamic_cast<minifi::Funnel*>(flow->findProcessorByName("CoolFunnel")); + REQUIRE(funnel); + REQUIRE(funnel->getUUIDStr() == "00000000-0000-0000-0000-000000000010"); + + // verify RPG input port + auto* port = dynamic_cast<minifi::RemoteProcessorGroupPort*>(flow->findProcessorByName("AmazingInputPort")); + REQUIRE(port); + REQUIRE(port->getUUIDStr() == "00000000-0000-0000-0000-000000000003"); + REQUIRE(port->getMaxConcurrentTasks() == 7); + REQUIRE(port->getInstances().size() == 1); + REQUIRE(port->getInstances().front().host_ == "localhost"); + REQUIRE(port->getInstances().front().port_ == 8090); + REQUIRE(port->getInstances().front().protocol_ == "https://"); + REQUIRE(port->getProperty("Port UUID") == "00000000-0000-0000-0000-000000000005"); + + // verify connection + std::map<std::string, minifi::Connection*> connection_map; + flow->getConnections(connection_map); + REQUIRE(4 == connection_map.size()); + auto connection1 = connection_map.at("00000000-0000-0000-0000-000000000002"); + REQUIRE(connection1); + REQUIRE("GenToFunnel" == connection1->getName()); + REQUIRE(connection1->getSource() == proc); + REQUIRE(connection1->getDestination() == funnel); + REQUIRE(connection1->getRelationships() == (std::set<core::Relationship>{{"a", ""}, {"b", ""}})); + REQUIRE(connection1->getMaxQueueSize() == 7); + REQUIRE(connection1->getMaxQueueDataSize() == 11_KiB); + REQUIRE(13s == connection1->getFlowExpirationDuration()); + + auto connection2 = connection_map.at("00000000-0000-0000-0000-000000000008"); + REQUIRE(connection2); + REQUIRE("FunnelToS2S" == connection2->getName()); + REQUIRE(connection2->getSource() == funnel); + REQUIRE(connection2->getDestination() == port); + REQUIRE(connection2->getRelationships() == (std::set<core::Relationship>{{"success", ""}})); +} diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp index b14ee1f83..651480578 100644 --- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp +++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp @@ -28,17 +28,14 @@ #include "Catch.h" #include "utils/TestUtils.h" #include "utils/StringUtils.h" +#include "ConfigurationTestController.h" using namespace std::literals::chrono_literals; TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") { - TestController test_controller; + ConfigurationTestController test_controller; - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + core::YamlConfiguration yamlConfig(test_controller.getContext()); SECTION("loading YAML without optional component IDs works") { static const std::string CONFIG_YAML_WITHOUT_IDS = @@ -222,13 +219,9 @@ Provenance Reporting: } TEST_CASE("Test YAML v3 Invalid Type", "[YamlConfiguration3]") { - TestController test_controller; + ConfigurationTestController test_controller; - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + core::YamlConfiguration yamlConfig(test_controller.getContext()); static const std::string TEST_CONFIG_YAML = R"( @@ -347,13 +340,9 @@ NiFi Properties Overrides: {} } TEST_CASE("Test YAML v3 Config Processing", "[YamlConfiguration3]") { - TestController test_controller; + ConfigurationTestController test_controller; - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + core::YamlConfiguration yamlConfig(test_controller.getContext()); static const std::string TEST_CONFIG_YAML = R"( @@ -496,17 +485,9 @@ NiFi Properties Overrides: {} } TEST_CASE("Test Dynamic Unsupported", "[YamlConfigurationDynamicUnsupported]") { - TestController test_controller; + ConfigurationTestController test_controller; - LogTestController &logTestController = LogTestController::getInstance(); - logTestController.setDebug<TestPlan>(); - logTestController.setTrace<core::YamlConfiguration>(); - - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + core::YamlConfiguration yamlConfig(test_controller.getContext()); static const std::string TEST_CONFIG_YAML = R"( Flow Controller: @@ -531,17 +512,9 @@ Processors: } TEST_CASE("Test Required Property", "[YamlConfigurationRequiredProperty]") { - TestController test_controller; - - LogTestController &logTestController = LogTestController::getInstance(); - logTestController.setDebug<TestPlan>(); - logTestController.setDebug<core::YamlConfiguration>(); + ConfigurationTestController test_controller; - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + core::YamlConfiguration yamlConfig(test_controller.getContext()); static const std::string TEST_CONFIG_YAML = R"( Flow Controller: @@ -567,25 +540,16 @@ Processors: } catch (const std::exception &e) { caught_exception = true; REQUIRE("Unable to parse configuration file for component named 'XYZ' because required property " - "'Input Directory' is not set [in 'Processors' section of configuration file]" == std::string(e.what())); + "'Input Directory' is not set [in '/Processors/0/Properties' section of configuration file]" == std::string(e.what())); } REQUIRE(caught_exception); } TEST_CASE("Test Required Property 2", "[YamlConfigurationRequiredProperty2]") { - TestController test_controller; + ConfigurationTestController test_controller; - LogTestController &logTestController = LogTestController::getInstance(); - logTestController.setDebug<TestPlan>(); - logTestController.setDebug<core::YamlConfiguration>(); - logTestController.setDebug<core::Processor>(); - - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + core::YamlConfiguration yamlConfig(test_controller.getContext()); static const std::string TEST_CONFIG_YAML = R"( Flow Controller: @@ -623,17 +587,9 @@ class DummyComponent : public core::ConfigurableComponent { }; TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") { - TestController test_controller; - - LogTestController &logTestController = LogTestController::getInstance(); - logTestController.setDebug<TestPlan>(); - logTestController.setDebug<core::YamlConfiguration>(); + ConfigurationTestController test_controller; - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + core::YamlConfiguration yamlConfig(test_controller.getContext()); const auto component = std::make_shared<DummyComponent>(); component->setSupportedProperties(std::array{ core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }), @@ -644,17 +600,9 @@ TEST_CASE("Test Dependent Property", "[YamlConfigurationDependentProperty]") { } TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]") { - TestController test_controller; + ConfigurationTestController test_controller; - LogTestController &logTestController = LogTestController::getInstance(); - logTestController.setDebug<TestPlan>(); - logTestController.setDebug<core::YamlConfiguration>(); - - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + core::YamlConfiguration yamlConfig(test_controller.getContext()); const auto component = std::make_shared<DummyComponent>(); component->setSupportedProperties(std::array{ core::Property("Prop A", "Prop A desc", "", false, "", { }, { }), @@ -673,16 +621,9 @@ TEST_CASE("Test Dependent Property 2", "[YamlConfigurationDependentProperty2]") } TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") { - TestController test_controller; - - LogTestController &logTestController = LogTestController::getInstance(); - logTestController.setDebug<TestPlan>(); - logTestController.setDebug<core::YamlConfiguration>(); - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + ConfigurationTestController test_controller; + + core::YamlConfiguration yamlConfig(test_controller.getContext()); const auto component = std::make_shared<DummyComponent>(); component->setSupportedProperties(std::array{ core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }), @@ -693,15 +634,9 @@ TEST_CASE("Test Exclusive Property", "[YamlConfigurationExclusiveProperty]") { } TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") { - TestController test_controller; - LogTestController &logTestController = LogTestController::getInstance(); - logTestController.setDebug<TestPlan>(); - logTestController.setDebug<core::YamlConfiguration>(); - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + ConfigurationTestController test_controller; + + core::YamlConfiguration yamlConfig(test_controller.getContext()); const auto component = std::make_shared<DummyComponent>(); component->setSupportedProperties(std::array{ core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }), @@ -712,16 +647,9 @@ TEST_CASE("Test Regex Property", "[YamlConfigurationRegexProperty]") { } TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]") { - TestController test_controller; - - LogTestController &logTestController = LogTestController::getInstance(); - logTestController.setDebug<TestPlan>(); - logTestController.setDebug<core::YamlConfiguration>(); - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + ConfigurationTestController test_controller; + + core::YamlConfiguration yamlConfig(test_controller.getContext()); const auto component = std::make_shared<DummyComponent>(); component->setSupportedProperties(std::array{ core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }), @@ -740,15 +668,8 @@ TEST_CASE("Test Exclusive Property 2", "[YamlConfigurationExclusiveProperty2]") } TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") { - TestController test_controller; - LogTestController &logTestController = LogTestController::getInstance(); - logTestController.setDebug<TestPlan>(); - logTestController.setDebug<core::YamlConfiguration>(); - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + ConfigurationTestController test_controller; + core::YamlConfiguration yamlConfig(test_controller.getContext()); const auto component = std::make_shared<DummyComponent>(); component->setSupportedProperties(std::array{ core::Property("Prop A", "Prop A desc", "val A", true, "", { }, { }), @@ -767,13 +688,9 @@ TEST_CASE("Test Regex Property 2", "[YamlConfigurationRegexProperty2]") { } TEST_CASE("Test YAML Config With Funnel", "[YamlConfiguration]") { - TestController test_controller; + ConfigurationTestController test_controller; - std::shared_ptr<core::Repository> testFlowFileRepo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> streamFactory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yamlConfig({testFlowFileRepo, content_repo, streamFactory, configuration}); + core::YamlConfiguration yamlConfig(test_controller.getContext()); static const std::string CONFIG_YAML_WITH_FUNNEL = R"( @@ -858,12 +775,8 @@ Remote Process Groups: [] } TEST_CASE("Test UUID duplication checks", "[YamlConfiguration]") { - TestController test_controller; - std::shared_ptr<core::Repository> test_flow_file_repo = core::createRepository("flowfilerepository"); - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - core::YamlConfiguration yaml_config({test_flow_file_repo, content_repo, stream_factory, configuration}); + ConfigurationTestController test_controller; + core::YamlConfiguration yaml_config(test_controller.getContext()); for (char i = '1'; i <= '8'; ++i) { DYNAMIC_SECTION("Changing UUID 00000000-0000-0000-0000-00000000000" << i << " to be a duplicate") { diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt index 354e944e2..dc51d0148 100644 --- a/libminifi/CMakeLists.txt +++ b/libminifi/CMakeLists.txt @@ -60,7 +60,7 @@ if (NOT OPENSSL_OFF) set(TLS_SOURCES "src/utils/tls/*.cpp" "src/io/tls/*.cpp") endif() -file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" " [...] +file(GLOB SOURCES "src/agent/*.cpp" "src/properties/*.cpp" "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/logging/internal/*.cpp" "src/core/logging/alert/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" " [...] # manually add this as it might not yet be present when this executes list(APPEND SOURCES "${CMAKE_CURRENT_BINARY_DIR}/agent_version.cpp") diff --git a/libminifi/include/RemoteProcessorGroupPort.h b/libminifi/include/RemoteProcessorGroupPort.h index 6cf1778ce..3e8cee690 100644 --- a/libminifi/include/RemoteProcessorGroupPort.h +++ b/libminifi/include/RemoteProcessorGroupPort.h @@ -160,6 +160,10 @@ class RemoteProcessorGroupPort : public core::Processor { } } + std::vector<RPG> getInstances() const { + return nifi_instances_; + } + void setHTTPProxy(const utils::HTTPProxy &proxy) { this->proxy_ = proxy; } diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index af43f8e02..89b1aa2cd 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -60,8 +60,6 @@ enum ProcessGroupType { REMOTE_PROCESS_GROUP, }; -#define ONSCHEDULE_RETRY_INTERVAL 30000 // millisecs - class ProcessGroup : public CoreComponent { friend struct ::ProcessGroupTestAccessor; public: diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h index fee9bd7e9..d454a3f69 100644 --- a/libminifi/include/core/ProcessorConfig.h +++ b/libminifi/include/core/ProcessorConfig.h @@ -30,11 +30,11 @@ namespace minifi { namespace core { -#define DEFAULT_SCHEDULING_STRATEGY "TIMER_DRIVEN" -#define DEFAULT_SCHEDULING_PERIOD_STR "1 sec" +constexpr const char* DEFAULT_SCHEDULING_STRATEGY{"TIMER_DRIVEN"}; +constexpr const char* DEFAULT_SCHEDULING_PERIOD_STR{"1 sec"}; constexpr std::chrono::milliseconds DEFAULT_SCHEDULING_PERIOD_MILLIS{1000}; constexpr std::chrono::nanoseconds DEFAULT_RUN_DURATION{0}; -#define DEFAULT_MAX_CONCURRENT_TASKS 1 +constexpr int DEFAULT_MAX_CONCURRENT_TASKS{1}; constexpr std::chrono::seconds DEFAULT_YIELD_PERIOD_SECONDS{1}; constexpr std::chrono::seconds DEFAULT_PENALIZATION_PERIOD{30}; diff --git a/libminifi/include/core/json/JsonConfiguration.h b/libminifi/include/core/flow/AdaptiveConfiguration.h similarity index 55% rename from libminifi/include/core/json/JsonConfiguration.h rename to libminifi/include/core/flow/AdaptiveConfiguration.h index 35623381d..67a4c5885 100644 --- a/libminifi/include/core/json/JsonConfiguration.h +++ b/libminifi/include/core/flow/AdaptiveConfiguration.h @@ -15,37 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #pragma once -#include <memory> -#include <optional> #include <string> -#include <unordered_set> +#include <memory> -#include "core/FlowConfiguration.h" -#include "core/logging/LoggerConfiguration.h" -#include "core/ProcessorConfig.h" -#include "Exception.h" -#include "io/StreamFactory.h" -#include "io/validation.h" -#include "sitetosite/SiteToSite.h" -#include "utils/Id.h" -#include "utils/StringUtils.h" -#include "utils/file/FileSystem.h" -#include "core/flow/StructuredConfiguration.h" +#include "StructuredConfiguration.h" -namespace org::apache::nifi::minifi::core { +namespace org::apache::nifi::minifi::core::flow { -class JsonConfiguration : public flow::StructuredConfiguration { +class AdaptiveConfiguration : public StructuredConfiguration { public: - explicit JsonConfiguration(ConfigurationContext ctx); - - ~JsonConfiguration() override = default; - - std::unique_ptr<core::ProcessGroup> getRoot() override; + explicit AdaptiveConfiguration(ConfigurationContext ctx); - std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &json_config) override; + std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &payload) override; }; -} // namespace org::apache::nifi::minifi::core +} // namespace org::apache::nifi::minifi::core::flow diff --git a/libminifi/include/core/flow/CheckRequiredField.h b/libminifi/include/core/flow/CheckRequiredField.h index 6c35b0cbc..11dc2b1e1 100644 --- a/libminifi/include/core/flow/CheckRequiredField.h +++ b/libminifi/include/core/flow/CheckRequiredField.h @@ -27,7 +27,7 @@ namespace org::apache::nifi::minifi::core::flow { bool isFieldPresent(const Node &node, std::string_view field_name); -std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section = ""); +std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names); /** * This is a helper function for verifying the existence of a required @@ -47,8 +47,8 @@ std::string buildErrorMessage(const Node &node, const std::vector<std::string> & * @throws std::invalid_argument if the required field 'field_name' is * not present in 'node' */ -void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section = "", std::string_view error_message = ""); +void checkRequiredField(const Node &node, const std::vector<std::string>& field_name, std::string_view error_message = ""); -std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message = {}); +std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view error_message = {}); } // namespace org::apache::nifi::minifi::core::flow diff --git a/libminifi/include/core/flow/FlowSchema.h b/libminifi/include/core/flow/FlowSchema.h new file mode 100644 index 000000000..0badac303 --- /dev/null +++ b/libminifi/include/core/flow/FlowSchema.h @@ -0,0 +1,88 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <vector> +#include <string> + +namespace org::apache::nifi::minifi::core::flow { + +struct FlowSchema { + using Keys = std::vector<std::string>; + + Keys flow_header; + Keys root_group; + + Keys processors; + Keys processor_properties; + Keys autoterminated_rels; + Keys max_concurrent_tasks; + Keys penalization_period; + Keys proc_yield_period; + Keys runduration_nanos; + Keys onschedule_retry_interval; + + Keys connections; + Keys max_queue_size; + Keys max_queue_data_size; + Keys swap_threshold; + Keys source_id; + Keys source_name; + Keys destination_id; + Keys destination_name; + Keys flowfile_expiration; + Keys drop_empty; + Keys source_relationship; + Keys source_relationship_list; + + Keys process_groups; + Keys process_group_version; + Keys scheduling_strategy; + Keys scheduling_period; + Keys name; + Keys identifier; + Keys type; + Keys controller_services; + Keys controller_service_properties; + Keys remote_process_group; + Keys provenance_reporting; + Keys provenance_reporting_port_uuid; + Keys provenance_reporting_batch_size; + Keys funnels; + Keys input_ports; + Keys output_ports; + Keys rpg_url; + Keys rpg_yield_period; + Keys rpg_timeout; + Keys rpg_local_network_interface; + Keys rpg_transport_protocol; + Keys rpg_proxy_host; + Keys rpg_proxy_user; + Keys rpg_proxy_password; + Keys rpg_proxy_port; + Keys rpg_input_ports; + Keys rpg_output_ports; + Keys rpg_port_properties; + Keys rpg_port_target_id; + + static FlowSchema getDefault(); + static FlowSchema getNiFiFlowJson(); +}; + +} // namespace org::apache::nifi::minifi::core::flow diff --git a/libminifi/include/core/flow/Node.h b/libminifi/include/core/flow/Node.h index 78735f95d..1bd3ad7e2 100644 --- a/libminifi/include/core/flow/Node.h +++ b/libminifi/include/core/flow/Node.h @@ -25,6 +25,8 @@ #include <memory> #include <utility> #include "nonstd/expected.hpp" +#include "utils/StringUtils.h" +#include "utils/gsl.h" namespace org::apache::nifi::minifi::core::flow { @@ -37,6 +39,7 @@ class Node { }; class Iterator { + friend class Node; public: class Value; @@ -53,6 +56,7 @@ class Node { Iterator& operator++() { impl_->operator++(); + ++idx_; return *this; } @@ -75,6 +79,8 @@ class Node { private: std::unique_ptr<IteratorImpl> impl_; + std::string path_; + int idx_{0}; }; class NodeImpl { @@ -88,6 +94,7 @@ class Node { virtual nonstd::expected<bool, std::exception_ptr> getBool() const = 0; virtual nonstd::expected<int64_t, std::exception_ptr> getInt64() const = 0; virtual nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const = 0; + virtual nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const = 0; virtual std::string getDebugString() const = 0; @@ -98,6 +105,8 @@ class Node { virtual std::optional<Cursor> getCursor() const = 0; + virtual Node createEmpty() const = 0; + virtual ~NodeImpl() = default; }; @@ -113,6 +122,7 @@ class Node { nonstd::expected<bool, std::exception_ptr> getBool() const {return impl_->getBool();} nonstd::expected<int64_t, std::exception_ptr> getInt64() const {return impl_->getInt64();} nonstd::expected<std::string, std::exception_ptr> getIntegerAsString() const {return impl_->getIntegerAsString();} + nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const {return impl_->getScalarAsString();} // return a string representation of the node (need not to be deserializable) std::string getDebugString() const {return impl_->getDebugString();} @@ -121,14 +131,57 @@ class Node { size_t empty() const { return size() == 0; } - Iterator begin() const {return impl_->begin();} - Iterator end() const {return impl_->end();} - Node operator[](std::string_view key) const {return impl_->operator[](key);} + Iterator begin() const { + Iterator it = impl_->begin(); + it.path_ = path_; + return it; + } + Iterator end() const { + Iterator it = impl_->end(); + it.path_ = path_; + return it; + } + + // considers @key to be a member of this node as is + Node getMember(std::string_view key) { + Node result = impl_->operator[](key); + result.path_ = utils::StringUtils::join_pack(path_, "/", key); + return result; + } + + // considers @key to be a '/'-delimited access path + Node operator[](std::string_view key) const { + Node result = *this; + for (auto& field : utils::StringUtils::split(std::string{key}, "/")) { + if (key == ".") { + // pass: self + } else { + result = result.getMember(field); + } + if (!result) { + break; + } + } + return result; + } + + // considers @keys to be a set of viable access paths, the first viable is returned + Node operator[](gsl::span<const std::string> keys) const { + for (auto& key : keys) { + if (Node result = (*this)[key]) { + return result; + } + } + return impl_->createEmpty(); + } + + std::string getPath() const {return path_;} std::optional<Cursor> getCursor() const {return impl_->getCursor();} private: std::shared_ptr<NodeImpl> impl_; + std::string path_; }; class Node::Iterator::Value : public Node, public std::pair<Node, Node> { diff --git a/libminifi/include/core/flow/StructuredConfiguration.h b/libminifi/include/core/flow/StructuredConfiguration.h index 1b46cda9d..762a2b352 100644 --- a/libminifi/include/core/flow/StructuredConfiguration.h +++ b/libminifi/include/core/flow/StructuredConfiguration.h @@ -21,6 +21,7 @@ #include <optional> #include <string> #include <unordered_set> +#include <vector> #include "core/FlowConfiguration.h" #include "core/logging/LoggerConfiguration.h" @@ -33,19 +34,10 @@ #include "utils/StringUtils.h" #include "utils/file/FileSystem.h" #include "core/flow/Node.h" +#include "FlowSchema.h" namespace org::apache::nifi::minifi::core::flow { -static constexpr char const* CONFIG_FLOW_CONTROLLER_KEY = "Flow Controller"; -static constexpr char const* CONFIG_PROCESSORS_KEY = "Processors"; -static constexpr char const* CONFIG_CONTROLLER_SERVICES_KEY = "Controller Services"; -static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY = "Remote Processing Groups"; -static constexpr char const* CONFIG_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote Process Groups"; -static constexpr char const* CONFIG_PROVENANCE_REPORT_KEY = "Provenance Reporting"; -static constexpr char const* CONFIG_FUNNELS_KEY = "Funnels"; -static constexpr char const* CONFIG_INPUT_PORTS_KEY = "Input Ports"; -static constexpr char const* CONFIG_OUTPUT_PORTS_KEY = "Output Ports"; - class StructuredConfiguration : public FlowConfiguration { public: StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger); @@ -60,6 +52,8 @@ class StructuredConfiguration : public FlowConfiguration { */ void validateComponentProperties(ConfigurableComponent& component, const std::string &component_name, const std::string §ion) const; + std::unique_ptr<core::ProcessGroup> getRoot() override; + protected: /** * Returns a shared pointer to a ProcessGroup object containing the @@ -70,7 +64,7 @@ class StructuredConfiguration : public FlowConfiguration { * @return the root ProcessGroup node of the flow * configuration tree */ - std::unique_ptr<core::ProcessGroup> getRootFrom(const Node& root_node); + std::unique_ptr<core::ProcessGroup> getRootFrom(const Node& root_node, FlowSchema schema); std::unique_ptr<core::ProcessGroup> createProcessGroup(const Node& node, bool is_root = false); @@ -99,7 +93,7 @@ class StructuredConfiguration : public FlowConfiguration { * @param parent the parent ProcessGroup for the port * @param direction the TransferDirection of the port */ - void parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction); + void parseRPGPort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction); /** * Parses the root level node for the flow configuration and @@ -155,7 +149,7 @@ class StructuredConfiguration : public FlowConfiguration { * @param properties_node the Node containing the properties * @param processor the Processor to which to add the resulting properties */ - void parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section); + void parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name); /** * Parses the Funnels section of a configuration. @@ -195,9 +189,9 @@ class StructuredConfiguration : public FlowConfiguration { * is optional and defaults to 'id' * @return the parsed or generated UUID string */ - std::string getOrGenerateId(const Node& node, const std::string& id_field = "id"); + std::string getOrGenerateId(const Node& node); - std::string getRequiredIdField(const Node& node, std::string_view section = "", std::string_view error_message = ""); + std::string getRequiredIdField(const Node& node, std::string_view error_message = ""); /** * This is a helper function for getting an optional value, if it exists. @@ -213,7 +207,9 @@ class StructuredConfiguration : public FlowConfiguration { * the optional field is missing. If not provided, * a default info message will be generated. */ - std::string getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section = "", const std::string& info_message = ""); + std::string getOptionalField(const Node& node, const std::vector<std::string>& field_name, const std::string& default_value, const std::string& info_message = ""); + + FlowSchema schema_; static std::shared_ptr<utils::IdGenerator> id_generator_; std::unordered_set<std::string> uuids_; diff --git a/libminifi/include/core/flow/StructuredConnectionParser.h b/libminifi/include/core/flow/StructuredConnectionParser.h index 2b1d73743..6d7869ea6 100644 --- a/libminifi/include/core/flow/StructuredConnectionParser.h +++ b/libminifi/include/core/flow/StructuredConnectionParser.h @@ -26,18 +26,19 @@ #include "core/flow/Node.h" #include "utils/gsl.h" +#include "core/flow/FlowSchema.h" namespace org::apache::nifi::minifi::core::flow { class StructuredConnectionParser { public: - static constexpr const char* CONFIG_CONNECTIONS_KEY{ "Connections" }; - - explicit StructuredConnectionParser(const Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent, const std::shared_ptr<logging::Logger>& logger) : + explicit StructuredConnectionParser(const Node& connectionNode, const std::string& name, gsl::not_null<core::ProcessGroup*> parent, + const std::shared_ptr<logging::Logger>& logger, std::optional<FlowSchema> schema = std::nullopt) : connectionNode_(connectionNode), name_(name), parent_(parent), - logger_(logger) { + logger_(logger), + schema_(schema.value_or(FlowSchema::getDefault())) { if (!connectionNode.isMap()) { throw std::logic_error("Connection node is not a map"); } @@ -60,6 +61,7 @@ class StructuredConnectionParser { const std::string& name_; gsl::not_null<core::ProcessGroup*> parent_; const std::shared_ptr<logging::Logger> logger_; + const FlowSchema schema_; }; } // namespace org::apache::nifi::minifi::core::flow diff --git a/libminifi/include/core/json/JsonNode.h b/libminifi/include/core/json/JsonNode.h index d19ca1582..bb11ca251 100644 --- a/libminifi/include/core/json/JsonNode.h +++ b/libminifi/include/core/json/JsonNode.h @@ -46,6 +46,10 @@ class JsonNode : public flow::Node::NodeImpl { return node_ ? node_->IsNull() : false; } + flow::Node createEmpty() const override { + return flow::Node{std::make_shared<JsonNode>(nullptr)}; + } + nonstd::expected<std::string, std::exception_ptr> getString() const override { try { if (!node_) { @@ -93,12 +97,27 @@ class JsonNode : public flow::Node::NodeImpl { if (!node_) throw std::runtime_error("Cannot get string from invalid json value"); if (node_->IsInt64()) return std::to_string(node_->GetInt64()); if (node_->IsUint64()) return std::to_string(node_->GetUint64()); + if (node_->IsString()) return std::string(node_->GetString(), node_->GetStringLength()); throw std::runtime_error("Cannot get string from non-integer json value"); } catch (...) { return nonstd::make_unexpected(std::current_exception()); } } + nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const override { + try { + if (!node_) throw std::runtime_error("Cannot get string from invalid json value"); + if (node_->IsBool()) return node_->GetBool() ? "true" : "false"; + if (node_->IsInt64()) return std::to_string(node_->GetInt64()); + if (node_->IsUint64()) return std::to_string(node_->GetUint64()); + if (node_->IsString()) return std::string(node_->GetString(), node_->GetStringLength()); + if (node_->IsDouble()) return std::to_string(node_->GetDouble()); + throw std::runtime_error("Cannot convert non-scalar json value to string"); + } catch (...) { + return nonstd::make_unexpected(std::current_exception()); + } + } + std::string getDebugString() const override { if (!node_) return "<invalid>"; if (node_->IsObject()) return "<Map>"; diff --git a/libminifi/include/core/state/nodes/SchedulingNodes.h b/libminifi/include/core/state/nodes/SchedulingNodes.h index 2262cd6e7..a98bb3bd6 100644 --- a/libminifi/include/core/state/nodes/SchedulingNodes.h +++ b/libminifi/include/core/state/nodes/SchedulingNodes.h @@ -49,7 +49,7 @@ class SchedulingDefaults : public DeviceInformation { SerializedResponseNode defaultSchedulingStrategy; defaultSchedulingStrategy.name = "defaultSchedulingStrategy"; - defaultSchedulingStrategy.value = DEFAULT_SCHEDULING_STRATEGY; + defaultSchedulingStrategy.value = core::DEFAULT_SCHEDULING_STRATEGY; schedulingDefaults.children.push_back(defaultSchedulingStrategy); @@ -67,7 +67,7 @@ class SchedulingDefaults : public DeviceInformation { SerializedResponseNode defaultMaxConcurrentTasks; defaultMaxConcurrentTasks.name = "defaultMaxConcurrentTasks"; - defaultMaxConcurrentTasks.value = DEFAULT_MAX_CONCURRENT_TASKS; + defaultMaxConcurrentTasks.value = core::DEFAULT_MAX_CONCURRENT_TASKS; schedulingDefaults.children.push_back(defaultMaxConcurrentTasks); diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h index 2b4a9d7d4..4d82ea394 100644 --- a/libminifi/include/core/yaml/YamlConfiguration.h +++ b/libminifi/include/core/yaml/YamlConfiguration.h @@ -44,15 +44,6 @@ class YamlConfiguration : public flow::StructuredConfiguration { ~YamlConfiguration() override = default; - /** - * Returns a shared pointer to a ProcessGroup object containing the - * flow configuration. - * - * @return the root ProcessGroup node of the flow - * configuration tree - */ - std::unique_ptr<core::ProcessGroup> getRoot() override; - /** * Returns a shared pointer to a ProcessGroup object containing the * flow configuration. The yamlConfigStream argument must point to diff --git a/libminifi/include/core/yaml/YamlNode.h b/libminifi/include/core/yaml/YamlNode.h index ad8422f6e..193c61d55 100644 --- a/libminifi/include/core/yaml/YamlNode.h +++ b/libminifi/include/core/yaml/YamlNode.h @@ -49,6 +49,10 @@ class YamlNode : public flow::Node::NodeImpl { return node_.IsNull(); } + flow::Node createEmpty() const override { + return flow::Node{std::make_shared<YamlNode>(YAML::Node{YAML::NodeType::Undefined})}; + } + nonstd::expected<std::string, std::exception_ptr> getString() const override { try { return node_.as<std::string>(); @@ -81,6 +85,14 @@ class YamlNode : public flow::Node::NodeImpl { } } + nonstd::expected<std::string, std::exception_ptr> getScalarAsString() const override { + try { + return node_.as<std::string>(); + } catch (...) { + return nonstd::make_unexpected(std::current_exception()); + } + } + std::string getDebugString() const override { if (!node_) return "<invalid>"; if (node_.IsNull()) return "null"; @@ -149,11 +161,11 @@ class YamlIterator : public flow::Node::Iterator::IteratorImpl { YAML::const_iterator it_; }; -flow::Node::Iterator YamlNode::begin() const { +inline flow::Node::Iterator YamlNode::begin() const { return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.begin())}; } -flow::Node::Iterator YamlNode::end() const { +inline flow::Node::Iterator YamlNode::end() const { return flow::Node::Iterator{std::make_unique<YamlIterator>(node_.end())}; } diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp index 81e2bb1e0..277184996 100644 --- a/libminifi/src/core/ConfigurationFactory.cpp +++ b/libminifi/src/core/ConfigurationFactory.cpp @@ -29,7 +29,7 @@ #include "io/StreamFactory.h" #include "core/yaml/YamlConfiguration.h" -#include "core/json/JsonConfiguration.h" +#include "core/flow/AdaptiveConfiguration.h" namespace org::apache::nifi::minifi::core { @@ -38,13 +38,7 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const Configura if (configuration_class_name) { class_name_lc = configuration_class_name.value(); } else if (ctx.path) { - if (utils::StringUtils::endsWith(ctx.path->string(), ".yml")) { - class_name_lc = "yamlconfiguration"; - } else if (utils::StringUtils::endsWith(ctx.path->string(), ".json")) { - class_name_lc = "jsonconfiguration"; - } else { - throw std::runtime_error("Could not infer config type from file path"); - } + class_name_lc = "adaptiveconfiguration"; } else { throw std::runtime_error("Neither configuration class nor config file path has been specified"); } @@ -57,8 +51,8 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(const Configura } else if (class_name_lc == "yamlconfiguration") { // only load if the class is defined. return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(ctx)); - } else if (class_name_lc == "jsonconfiguration") { - return std::unique_ptr<core::JsonConfiguration>(instantiate<core::JsonConfiguration>(ctx)); + } else if (class_name_lc == "adaptiveconfiguration") { + return std::unique_ptr<core::flow::AdaptiveConfiguration>(instantiate<core::flow::AdaptiveConfiguration>(ctx)); } else { if (fail_safe) { return std::make_unique<core::FlowConfiguration>(ctx); diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 7300d2129..509c0542d 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -33,6 +33,8 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core { +constexpr int DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS = 30000; + std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator(); ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, const utils::Identifier& uuid) @@ -54,7 +56,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, const utils: if (parent_process_group_ != nullptr) { onschedule_retry_msec_ = parent_process_group_->getOnScheduleRetryPeriod(); } else { - onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL; + onschedule_retry_msec_ = DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS; } transmitting_ = false; transport_protocol_ = "RAW"; @@ -69,7 +71,7 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name) parent_process_group_(nullptr), logger_(logging::LoggerFactory<ProcessGroup>::getLogger()) { yield_period_msec_ = 0ms; - onschedule_retry_msec_ = ONSCHEDULE_RETRY_INTERVAL; + onschedule_retry_msec_ = DEFAULT_ONSCHEDULE_RETRY_INTERVAL_MS; transmitting_ = false; transport_protocol_ = "RAW"; diff --git a/libminifi/src/core/flow/AdaptiveConfiguration.cpp b/libminifi/src/core/flow/AdaptiveConfiguration.cpp new file mode 100644 index 000000000..cef220bb4 --- /dev/null +++ b/libminifi/src/core/flow/AdaptiveConfiguration.cpp @@ -0,0 +1,67 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/flow/AdaptiveConfiguration.h" + +#include "rapidjson/document.h" +#include "core/json/JsonNode.h" +#include "core/yaml/YamlNode.h" +#include "yaml-cpp/yaml.h" +#include "utils/file/FileUtils.h" +#include "Defaults.h" + +namespace org::apache::nifi::minifi::core::flow { + +AdaptiveConfiguration::AdaptiveConfiguration(ConfigurationContext ctx) + : StructuredConfiguration(([&] { + if (!ctx.path) { + if (utils::file::exists(DEFAULT_NIFI_CONFIG_JSON)) { + ctx.path = DEFAULT_NIFI_CONFIG_JSON; + } else { + ctx.path = DEFAULT_NIFI_CONFIG_YML; + } + } + return std::move(ctx); + })(), + logging::LoggerFactory<AdaptiveConfiguration>::getLogger()) {} + +std::unique_ptr<core::ProcessGroup> AdaptiveConfiguration::getRootFromPayload(const std::string &payload) { + try { + rapidjson::Document doc; + rapidjson::ParseResult res = doc.Parse(payload.c_str(), payload.length()); + if (res) { + flow::Node root{std::make_shared<JsonNode>(&doc)}; + if (root[FlowSchema::getDefault().flow_header]) { + logger_->log_debug("Processing configuration as default json"); + return getRootFrom(root, FlowSchema::getDefault()); + } else { + logger_->log_debug("Processing configuration as nifi flow json"); + return getRootFrom(root, FlowSchema::getNiFiFlowJson()); + } + } + logger_->log_debug("Could not parse configuration as json, trying yaml"); + YAML::Node rootYamlNode = YAML::Load(payload); + flow::Node root{std::make_shared<YamlNode>(rootYamlNode)}; + return getRootFrom(root, FlowSchema::getDefault()); + } catch(...) { + logger_->log_error("Invalid configuration file"); + throw; + } +} + +} // namespace org::apache::nifi::minifi::core::flow diff --git a/libminifi/src/core/flow/CheckRequiredField.cpp b/libminifi/src/core/flow/CheckRequiredField.cpp index 6942a5844..4539d9d78 100644 --- a/libminifi/src/core/flow/CheckRequiredField.cpp +++ b/libminifi/src/core/flow/CheckRequiredField.cpp @@ -27,7 +27,7 @@ bool isFieldPresent(const Node &node, std::string_view field_name) { return bool{node[field_name]}; } -std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names, std::string_view section) { +std::string buildErrorMessage(const Node &node, const std::vector<std::string> &alternate_field_names) { const Node name_node = node["name"]; // Build a helpful error message for the user so they can fix the // invalid config file, using the component name if present @@ -36,32 +36,32 @@ std::string buildErrorMessage(const Node &node, const std::vector<std::string> & name_node ? "Unable to parse configuration file for component named '" + name_node.getString().value() + "' as none of the possible required fields [" + field_list_string + "] is available" : "Unable to parse configuration file as none of the possible required fields [" + field_list_string + "] is available"; - if (!section.empty()) { - err_msg += " [in '" + std::string(section) + "' section of configuration file]"; - } + + err_msg += " [in '" + node.getPath() + "' section of configuration file]"; + if (auto cursor = node.getCursor()) { err_msg += " [line:column, pos at " + std::to_string(cursor->line) + ":" + std::to_string(cursor->column) + ", " + std::to_string(cursor->pos) + "]"; } return err_msg; } -void checkRequiredField(const Node &node, std::string_view field_name, std::string_view section, std::string_view error_message) { - if (!isFieldPresent(node, field_name)) { +void checkRequiredField(const Node &node, const std::vector<std::string>& field_names, std::string_view error_message) { + if (std::none_of(field_names.begin(), field_names.end(), [&] (auto& field) {return isFieldPresent(node, field);})) { if (error_message.empty()) { - throw std::invalid_argument(buildErrorMessage(node, std::vector<std::string>{std::string(field_name)}, section)); + throw std::invalid_argument(buildErrorMessage(node, field_names)); } throw std::invalid_argument(error_message.data()); } } -std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view section, std::string_view error_message) { +std::string getRequiredField(const Node &node, const std::vector<std::string> &alternate_names, std::string_view error_message) { for (const auto& name : alternate_names) { if (isFieldPresent(node, name)) { return node[name].getString().value(); } } if (error_message.empty()) { - throw std::invalid_argument(buildErrorMessage(node, alternate_names, section)); + throw std::invalid_argument(buildErrorMessage(node, alternate_names)); } throw std::invalid_argument(error_message.data()); } diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp new file mode 100644 index 000000000..a5df0dc64 --- /dev/null +++ b/libminifi/src/core/flow/FlowSchema.cpp @@ -0,0 +1,144 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/flow/FlowSchema.h" + +namespace org::apache::nifi::minifi::core::flow { + +FlowSchema FlowSchema::getDefault() { + return FlowSchema{ + .flow_header = {"Flow Controller"}, + .root_group = {"."}, + + .processors = {"Processors"}, + .processor_properties = {"Properties"}, + .autoterminated_rels = {"auto-terminated relationships list"}, + .max_concurrent_tasks = {"max concurrent tasks"}, + .penalization_period = {"penalization period"}, + .proc_yield_period = {"yield period"}, + .runduration_nanos = {"run duration nanos"}, + .onschedule_retry_interval = {"onschedule retry interval"}, + + .connections = {"Connections"}, + .max_queue_size = {"max work queue size"}, + .max_queue_data_size = {"max work queue data size"}, + .swap_threshold = {"swap threshold"}, + .source_id = {"source id"}, + .source_name = {"source name"}, + .destination_id = {"destination id"}, + .destination_name = {"destination name"}, + .flowfile_expiration = {"flowfile expiration"}, + .drop_empty = {"drop empty"}, + .source_relationship = {"source relationship name"}, + .source_relationship_list = {"source relationship names"}, + + .process_groups = {"Process Groups"}, + .process_group_version = {"version"}, + .scheduling_strategy = {"scheduling strategy"}, + .scheduling_period = {"scheduling period"}, + .name = {"name"}, + .identifier = {"id"}, + .type = {"class", "type"}, + .controller_services = {"Controller Services"}, + .controller_service_properties = {"Properties"}, + .remote_process_group = {"Remote Processing Groups", "Remote Process Groups"}, + .provenance_reporting = {"Provenance Reporting"}, + .provenance_reporting_port_uuid = {"port uuid"}, + .provenance_reporting_batch_size = {"batch size"}, + .funnels = {"Funnels"}, + .input_ports = {"Input Ports"}, + .output_ports = {"Output Ports"}, + + .rpg_url = {"url"}, + .rpg_yield_period = {"yield period"}, + .rpg_timeout = {"timeout"}, + .rpg_local_network_interface = {"local network interface"}, + .rpg_transport_protocol = {"transport protocol"}, + .rpg_proxy_host = {"proxy host"}, + .rpg_proxy_user = {"proxy user"}, + .rpg_proxy_password = {"proxy password"}, + .rpg_proxy_port = {"proxy port"}, + .rpg_input_ports = {"Input Ports"}, + .rpg_output_ports = {"Output Ports"}, + .rpg_port_properties = {"Properties"}, + .rpg_port_target_id = {} + }; +} + +FlowSchema FlowSchema::getNiFiFlowJson() { + return FlowSchema{ + .flow_header = {"rootGroup"}, + .root_group = {"rootGroup"}, + .processors = {"processors"}, + .processor_properties = {"properties"}, + .autoterminated_rels = {"autoTerminatedRelationships"}, + .max_concurrent_tasks = {"concurrentlySchedulableTaskCount"}, + .penalization_period = {"penaltyDuration"}, + .proc_yield_period = {"yieldDuration"}, + // TODO(adebreceni): MINIFICPP-2033 since this is unused the mismatch between nano and milli is not an issue + .runduration_nanos = {"runDurationMillis"}, + .onschedule_retry_interval = {}, + + .connections = {"connections"}, + .max_queue_size = {"backPressureObjectThreshold"}, + .max_queue_data_size = {"backPressureDataSizeThreshold"}, + .swap_threshold = {}, + .source_id = {"source/id"}, + .source_name = {"source/name"}, + .destination_id = {"destination/id"}, + .destination_name = {"destination/name"}, + .flowfile_expiration = {"flowFileExpiration"}, + // contrary to nifi we support dropEmpty in flow json as well + .drop_empty = {"dropEmpty"}, + .source_relationship = {}, + .source_relationship_list = {"selectedRelationships"}, + + .process_groups = {"processGroups"}, + .process_group_version = {}, + .scheduling_strategy = {"schedulingStrategy"}, + .scheduling_period = {"schedulingPeriod"}, + .name = {"name"}, + .identifier = {"identifier"}, + .type = {"type"}, + .controller_services = {"controllerServices"}, + .controller_service_properties = {"properties"}, + .remote_process_group = {"remoteProcessGroups"}, + .provenance_reporting = {}, + .provenance_reporting_port_uuid = {}, + .provenance_reporting_batch_size = {}, + .funnels = {"funnels"}, + .input_ports = {"inputPorts"}, + .output_ports = {"outputPorts"}, + + .rpg_url = {"targetUri"}, + .rpg_yield_period = {"yieldDuration"}, + .rpg_timeout = {"communicationsTimeout"}, + .rpg_local_network_interface = {"localNetworkInterface"}, + .rpg_transport_protocol = {"transportProtocol"}, + .rpg_proxy_host = {"proxyHost"}, + .rpg_proxy_user = {"proxyUser"}, + .rpg_proxy_password = {"proxyPassword"}, + .rpg_proxy_port = {"proxyPort"}, + .rpg_input_ports = {"inputPorts"}, + .rpg_output_ports = {"outputPorts"}, + .rpg_port_properties = {}, + .rpg_port_target_id = {"targetId"} + }; +} + +} // namespace org::apache::nifi::minifi::core::flow diff --git a/libminifi/src/core/flow/Node.cpp b/libminifi/src/core/flow/Node.cpp index 7fd7b0df8..1c751ebe0 100644 --- a/libminifi/src/core/flow/Node.cpp +++ b/libminifi/src/core/flow/Node.cpp @@ -21,7 +21,15 @@ namespace org::apache::nifi::minifi::core::flow { Node::Iterator::Value Node::Iterator::operator*() const { - return impl_->operator*(); + Value value = impl_->operator*(); + if (value) { + // sequence iterator + value.path_ = utils::StringUtils::join_pack(path_, "/", std::to_string(idx_)); + } else if (value.second) { + // map iterator + value.second.path_ = utils::StringUtils::join_pack(path_, "/", value.first.getString().value()); + } + return value; } } // namespace org::apache::nifi::minifi::core::flow diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index 7e1d41e63..02624a4dc 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -34,13 +34,26 @@ namespace org::apache::nifi::minifi::core::flow { std::shared_ptr<utils::IdGenerator> StructuredConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator(); +std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRoot() { + if (!config_path_) { + logger_->log_error("Cannot instantiate flow, no config file is set."); + throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified"); + } + const auto configuration = filesystem_->read(config_path_.value()); + if (!configuration) { + // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network + return nullptr; + } + return getRootFromPayload(configuration.value()); +} + StructuredConfiguration::StructuredConfiguration(ConfigurationContext ctx, std::shared_ptr<logging::Logger> logger) : FlowConfiguration(std::move(ctx)), logger_(std::move(logger)) {} std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGroup(const Node& root_flow_node) { - auto flow_controller_node = root_flow_node[CONFIG_FLOW_CONTROLLER_KEY]; - auto root_group = parseProcessGroup(flow_controller_node, root_flow_node, true); + checkRequiredField(root_flow_node, schema_.flow_header); + auto root_group = parseProcessGroup(root_flow_node[schema_.flow_header], root_flow_node[schema_.root_group], true); this->name_ = root_group->getName(); return root_group; } @@ -48,15 +61,15 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseRootProcessGro std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup(const Node& node, bool is_root) { int version = 0; - checkRequiredField(node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY); - auto flowName = node["name"].getString().value(); + checkRequiredField(node, schema_.name); + auto flowName = node[schema_.name].getString().value(); utils::Identifier uuid; // assignment throws on invalid uuid uuid = getOrGenerateId(node); - if (node["version"]) { - version = gsl::narrow<int>(node["version"].getInt64().value()); + if (node[schema_.process_group_version]) { + version = gsl::narrow<int>(node[schema_.process_group_version].getInt64().value()); } logger_->log_debug("parseRootProcessGroup: id => [%s], name => [%s]", uuid.to_string(), flowName); @@ -67,8 +80,8 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup( group = FlowConfiguration::createSimpleProcessGroup(flowName, uuid, version); } - if (node["onschedule retry interval"]) { - auto onScheduleRetryPeriod = node["onschedule retry interval"].getString().value(); + if (node[schema_.onschedule_retry_interval]) { + auto onScheduleRetryPeriod = node[schema_.onschedule_retry_interval].getString().value(); logger_->log_debug("parseRootProcessGroup: onschedule retry period => [%s]", onScheduleRetryPeriod); auto on_schedule_retry_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(onScheduleRetryPeriod); @@ -83,20 +96,13 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::createProcessGroup( std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(const Node& header_node, const Node& node, bool is_root) { auto group = createProcessGroup(header_node, is_root); - Node processorsNode = node[CONFIG_PROCESSORS_KEY]; - Node connectionsNode = node[StructuredConnectionParser::CONFIG_CONNECTIONS_KEY]; - Node funnelsNode = node[CONFIG_FUNNELS_KEY]; - Node inputPortsNode = node[CONFIG_INPUT_PORTS_KEY]; - Node outputPortsNode = node[CONFIG_OUTPUT_PORTS_KEY]; - Node remoteProcessingGroupsNode = [&] { - // assignment is not supported on invalid nodes - Node candidate = node[CONFIG_REMOTE_PROCESS_GROUP_KEY]; - if (candidate) { - return candidate; - } - return node[CONFIG_REMOTE_PROCESS_GROUP_KEY_V3]; - }(); - Node childProcessGroupNodeSeq = node["Process Groups"]; + Node processorsNode = node[schema_.processors]; + Node connectionsNode = node[schema_.connections]; + Node funnelsNode = node[schema_.funnels]; + Node inputPortsNode = node[schema_.input_ports]; + Node outputPortsNode = node[schema_.output_ports]; + Node remoteProcessingGroupsNode = node[schema_.remote_process_group]; + Node childProcessGroupNodeSeq = node[schema_.process_groups]; parseProcessorNode(processorsNode, group.get()); parseRemoteProcessGroup(remoteProcessingGroupsNode, group.get()); @@ -115,10 +121,11 @@ std::unique_ptr<core::ProcessGroup> StructuredConfiguration::parseProcessGroup(c return group; } -std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& root_node) { +std::unique_ptr<core::ProcessGroup> StructuredConfiguration::getRootFrom(const Node& root_node, FlowSchema schema) { + schema_ = std::move(schema); uuids_.clear(); - Node controllerServiceNode = root_node[CONFIG_CONTROLLER_SERVICES_KEY]; - Node provenanceReportNode = root_node[CONFIG_PROVENANCE_REPORT_KEY]; + Node controllerServiceNode = root_node[schema_.root_group][schema_.controller_services]; + Node provenanceReportNode = root_node[schema_.provenance_reporting]; parseControllerServices(controllerServiceNode); // Create the root process group @@ -157,14 +164,14 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co for (const auto& procNode : processors_node) { core::ProcessorConfig procCfg; - checkRequiredField(procNode, "name", CONFIG_PROCESSORS_KEY); - procCfg.name = procNode["name"].getString().value(); + checkRequiredField(procNode, schema_.name); + procCfg.name = procNode[schema_.name].getString().value(); procCfg.id = getOrGenerateId(procNode); uuid = procCfg.id; logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id); - checkRequiredField(procNode, "class", CONFIG_PROCESSORS_KEY); - procCfg.javaClass = procNode["class"].getString().value(); + checkRequiredField(procNode, schema_.type); + procCfg.javaClass = procNode[schema_.type].getString().value(); logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass); // Determine the processor name only from the Java class @@ -187,36 +194,35 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co processor->setFlowIdentifier(flow_version_->getFlowIdentifier()); - procCfg.schedulingStrategy = getOptionalField(procNode, "scheduling strategy", DEFAULT_SCHEDULING_STRATEGY, CONFIG_PROCESSORS_KEY); + procCfg.schedulingStrategy = getOptionalField(procNode, schema_.scheduling_strategy, DEFAULT_SCHEDULING_STRATEGY); logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy); - procCfg.schedulingPeriod = getOptionalField(procNode, "scheduling period", DEFAULT_SCHEDULING_PERIOD_STR, CONFIG_PROCESSORS_KEY); + procCfg.schedulingPeriod = getOptionalField(procNode, schema_.scheduling_period, DEFAULT_SCHEDULING_PERIOD_STR); logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod); - if (auto tasksNode = procNode["max concurrent tasks"]) { + if (auto tasksNode = procNode[schema_.max_concurrent_tasks]) { procCfg.maxConcurrentTasks = tasksNode.getIntegerAsString().value(); logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks); } - if (procNode["penalization period"]) { - procCfg.penalizationPeriod = procNode["penalization period"].getString().value(); + if (auto penalizationNode = procNode[schema_.penalization_period]) { + procCfg.penalizationPeriod = penalizationNode.getString().value(); logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod); } - if (procNode["yield period"]) { - procCfg.yieldPeriod = procNode["yield period"].getString().value(); + if (auto yieldNode = procNode[schema_.proc_yield_period]) { + procCfg.yieldPeriod = yieldNode.getString().value(); logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod); } - if (auto runNode = procNode["run duration nanos"]) { + if (auto runNode = procNode[schema_.runduration_nanos]) { procCfg.runDurationNanos = runNode.getIntegerAsString().value(); logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos); } // handle auto-terminated relationships - if (procNode["auto-terminated relationships list"]) { - Node autoTerminatedSequence = procNode["auto-terminated relationships list"]; + if (Node autoTerminatedSequence = procNode[schema_.autoterminated_rels]) { std::vector<std::string> rawAutoTerminatedRelationshipValues; if (autoTerminatedSequence.isSequence() && autoTerminatedSequence.size() > 0) { for (const auto& autoTerminatedRel : autoTerminatedSequence) { @@ -227,9 +233,8 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co } // handle processor properties - if (procNode["Properties"]) { - Node propertiesNode = procNode["Properties"]; - parsePropertiesNode(propertiesNode, *processor, procCfg.name, CONFIG_PROCESSORS_KEY); + if (Node propertiesNode = procNode[schema_.processor_properties]) { + parsePropertiesNode(propertiesNode, *processor, procCfg.name); } // Take care of scheduling @@ -304,13 +309,13 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, return; } for (const auto& currRpgNode : rpg_node_seq) { - checkRequiredField(currRpgNode, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY); - auto name = currRpgNode["name"].getString().value(); + checkRequiredField(currRpgNode, schema_.name); + auto name = currRpgNode[schema_.name].getString().value(); id = getOrGenerateId(currRpgNode); logger_->log_debug("parseRemoteProcessGroup: name => [%s], id => [%s]", name, id); - auto url = getOptionalField(currRpgNode, "url", "", CONFIG_REMOTE_PROCESS_GROUP_KEY); + auto url = getOptionalField(currRpgNode, schema_.rpg_url, ""); logger_->log_debug("parseRemoteProcessGroup: url => [%s]", url); @@ -318,8 +323,8 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, auto group = createRemoteProcessGroup(name, uuid); group->setParent(parentGroup); - if (currRpgNode["yield period"]) { - auto yieldPeriod = currRpgNode["yield period"].getString().value(); + if (currRpgNode[schema_.rpg_yield_period]) { + auto yieldPeriod = currRpgNode[schema_.rpg_yield_period].getString().value(); logger_->log_debug("parseRemoteProcessGroup: yield period => [%s]", yieldPeriod); auto yield_period_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(yieldPeriod); @@ -329,8 +334,8 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, } } - if (currRpgNode["timeout"]) { - auto timeout = currRpgNode["timeout"].getString().value(); + if (currRpgNode[schema_.rpg_timeout]) { + auto timeout = currRpgNode[schema_.rpg_timeout].getString().value(); logger_->log_debug("parseRemoteProcessGroup: timeout => [%s]", timeout); auto timeout_value = utils::timeutils::StringToDuration<std::chrono::milliseconds>(timeout); @@ -340,33 +345,33 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, } } - if (currRpgNode["local network interface"]) { - auto interface = currRpgNode["local network interface"].getString().value(); + if (currRpgNode[schema_.rpg_local_network_interface]) { + auto interface = currRpgNode[schema_.rpg_local_network_interface].getString().value(); logger_->log_debug("parseRemoteProcessGroup: local network interface => [%s]", interface); group->setInterface(interface); } - if (currRpgNode["transport protocol"]) { - auto transport_protocol = currRpgNode["transport protocol"].getString().value(); + if (currRpgNode[schema_.rpg_transport_protocol]) { + auto transport_protocol = currRpgNode[schema_.rpg_transport_protocol].getString().value(); logger_->log_debug("parseRemoteProcessGroup: transport protocol => [%s]", transport_protocol); if (transport_protocol == "HTTP") { group->setTransportProtocol(transport_protocol); - if (currRpgNode["proxy host"]) { - auto http_proxy_host = currRpgNode["proxy host"].getString().value(); + if (currRpgNode[schema_.rpg_proxy_host]) { + auto http_proxy_host = currRpgNode[schema_.rpg_proxy_host].getString().value(); logger_->log_debug("parseRemoteProcessGroup: proxy host => [%s]", http_proxy_host); group->setHttpProxyHost(http_proxy_host); - if (currRpgNode["proxy user"]) { - auto http_proxy_username = currRpgNode["proxy user"].getString().value(); + if (currRpgNode[schema_.rpg_proxy_user]) { + auto http_proxy_username = currRpgNode[schema_.rpg_proxy_user].getString().value(); logger_->log_debug("parseRemoteProcessGroup: proxy user => [%s]", http_proxy_username); group->setHttpProxyUserName(http_proxy_username); } - if (currRpgNode["proxy password"]) { - auto http_proxy_password = currRpgNode["proxy password"].getString().value(); + if (currRpgNode[schema_.rpg_proxy_password]) { + auto http_proxy_password = currRpgNode[schema_.rpg_proxy_password].getString().value(); logger_->log_debug("parseRemoteProcessGroup: proxy password => [%s]", http_proxy_password); group->setHttpProxyPassWord(http_proxy_password); } - if (currRpgNode["proxy port"]) { - auto http_proxy_port = currRpgNode["proxy port"].getIntegerAsString().value(); + if (currRpgNode[schema_.rpg_proxy_port]) { + auto http_proxy_port = currRpgNode[schema_.rpg_proxy_port].getIntegerAsString().value(); int32_t port; if (core::Property::StringToInt(http_proxy_port, port)) { logger_->log_debug("parseRemoteProcessGroup: proxy port => [%d]", port); @@ -386,19 +391,19 @@ void StructuredConfiguration::parseRemoteProcessGroup(const Node& rpg_node_seq, group->setTransmitting(true); group->setURL(url); - checkRequiredField(currRpgNode, "Input Ports", CONFIG_REMOTE_PROCESS_GROUP_KEY); - auto inputPorts = currRpgNode["Input Ports"]; + checkRequiredField(currRpgNode, schema_.rpg_input_ports); + auto inputPorts = currRpgNode[schema_.rpg_input_ports]; if (inputPorts && inputPorts.isSequence()) { for (const auto& currPort : inputPorts) { - parsePort(currPort, group.get(), sitetosite::SEND); + parseRPGPort(currPort, group.get(), sitetosite::SEND); } // for node } - auto outputPorts = currRpgNode["Output Ports"]; + auto outputPorts = currRpgNode[schema_.rpg_output_ports]; if (outputPorts && outputPorts.isSequence()) { for (const auto& currPort : outputPorts) { logger_->log_debug("Got a current port, iterating..."); - parsePort(currPort, group.get(), sitetosite::RECEIVE); + parseRPGPort(currPort, group.get(), sitetosite::RECEIVE); } // for node } parentGroup->addProcessGroup(std::move(group)); @@ -420,10 +425,10 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P auto reportTask = createProvenanceReportTask(); - checkRequiredField(node, "scheduling strategy", CONFIG_PROVENANCE_REPORT_KEY); - auto schedulingStrategyStr = node["scheduling strategy"].getString().value(); - checkRequiredField(node, "scheduling period", CONFIG_PROVENANCE_REPORT_KEY); - auto schedulingPeriodStr = node["scheduling period"].getString().value(); + checkRequiredField(node, schema_.scheduling_strategy); + auto schedulingStrategyStr = node[schema_.scheduling_strategy].getString().value(); + checkRequiredField(node, schema_.scheduling_period); + auto schedulingPeriodStr = node[schema_.scheduling_period].getString().value(); if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(schedulingPeriodStr)) { logger_->log_debug("ProvenanceReportingTask schedulingPeriod %" PRId64 " ns", scheduling_period->count()); @@ -456,10 +461,10 @@ void StructuredConfiguration::parseProvenanceReporting(const Node& node, core::P logger_->log_debug("ProvenanceReportingTask URL %s", urlStr); } } - checkRequiredField(node, "port uuid", CONFIG_PROVENANCE_REPORT_KEY); - auto portUUIDStr = node["port uuid"].getString().value(); - checkRequiredField(node, "batch size", CONFIG_PROVENANCE_REPORT_KEY); - auto batchSizeStr = node["batch size"].getString().value(); + checkRequiredField(node, schema_.provenance_reporting_port_uuid); + auto portUUIDStr = node[schema_.provenance_reporting_port_uuid].getString().value(); + checkRequiredField(node, schema_.provenance_reporting_batch_size); + auto batchSizeStr = node[schema_.provenance_reporting_batch_size].getString().value(); logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr); port_uuid = portUUIDStr; @@ -481,9 +486,9 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser return; } for (const auto& service_node : controller_services_node) { - checkRequiredField(service_node, "name", CONFIG_CONTROLLER_SERVICES_KEY); + checkRequiredField(service_node, schema_.name); - auto type = getRequiredField(service_node, std::vector<std::string>{"class", "type"}, CONFIG_CONTROLLER_SERVICES_KEY); + auto type = getRequiredField(service_node, schema_.type); logger_->log_debug("Using type %s for controller service node", type); std::string fullType = type; @@ -493,8 +498,8 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser type = type.substr(lastOfIdx); } - auto name = service_node["name"].getString().value(); - auto id = getRequiredIdField(service_node, CONFIG_CONTROLLER_SERVICES_KEY); + auto name = service_node[schema_.name].getString().value(); + auto id = getRequiredIdField(service_node); utils::Identifier uuid; uuid = id; @@ -502,11 +507,11 @@ void StructuredConfiguration::parseControllerServices(const Node& controller_ser if (nullptr != controller_service_node) { logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name); controller_service_node->initialize(); - if (Node propertiesNode = service_node["Properties"]) { + if (Node propertiesNode = service_node[schema_.controller_service_properties]) { // we should propagate properties to the node and to the implementation - parsePropertiesNode(propertiesNode, *controller_service_node, name, CONFIG_CONTROLLER_SERVICES_KEY); + parsePropertiesNode(propertiesNode, *controller_service_node, name); if (auto controllerServiceImpl = controller_service_node->getControllerServiceImplementation(); controllerServiceImpl) { - parsePropertiesNode(propertiesNode, *controllerServiceImpl, name, CONFIG_CONTROLLER_SERVICES_KEY); + parsePropertiesNode(propertiesNode, *controllerServiceImpl, name); } } } else { @@ -538,7 +543,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c // Default name to be same as ID // If name is specified in configuration, use the value - const auto name = connection_node["name"].getString().value_or(id); + const auto name = connection_node[schema_.name].getString().value_or(id); const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] { logger_->log_debug("Incorrect connection UUID format."); @@ -547,7 +552,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c auto connection = createConnection(name, uuid.value()); logger_->log_debug("Created connection with UUID %s and name %s", id, name); - const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_); + const StructuredConnectionParser connectionParser(connection_node, name, gsl::not_null<core::ProcessGroup*>{ parent }, logger_, schema_); connectionParser.configureConnectionSourceRelationships(*connection); connection->setMaxQueueSize(connectionParser.getWorkQueueSize()); connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSize()); @@ -561,7 +566,7 @@ void StructuredConfiguration::parseConnection(const Node& connection_node_seq, c } } -void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction) { +void StructuredConfiguration::parseRPGPort(const Node& port_node, core::ProcessGroup* parent, sitetosite::TransferDirection direction) { utils::Identifier uuid; if (!parent) { @@ -570,9 +575,9 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou } // Check for required fields - checkRequiredField(port_node, "name", CONFIG_REMOTE_PROCESS_GROUP_KEY); - auto nameStr = port_node["name"].getString().value(); - auto portId = getRequiredIdField(port_node, CONFIG_REMOTE_PROCESS_GROUP_KEY, + checkRequiredField(port_node, schema_.name); + auto nameStr = port_node[schema_.name].getString().value(); + auto portId = getRequiredIdField(port_node, "The field 'id' is required for " "the port named '" + nameStr + "' in the Flow Config. If this port " "is an input port for a NiFi Remote Process Group, the port " @@ -597,8 +602,11 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou // else defaults to RAW // handle port properties - if (Node propertiesNode = port_node["Properties"]) { - parsePropertiesNode(propertiesNode, *port, nameStr, CONFIG_REMOTE_PROCESS_GROUP_KEY); + if (Node propertiesNode = port_node[schema_.rpg_port_properties]) { + parsePropertiesNode(propertiesNode, *port, nameStr); + } else { + parsePropertyNodeElement(minifi::RemoteProcessorGroupPort::portUUID.getName(), port_node[schema_.rpg_port_target_id], *port); + validateComponentProperties(*port, nameStr, port_node.getPath()); } // add processor to parent @@ -606,7 +614,7 @@ void StructuredConfiguration::parsePort(const Node& port_node, core::ProcessGrou parent->addProcessor(std::move(port)); processor.setScheduledState(core::RUNNING); - if (auto tasksNode = port_node["max concurrent tasks"]) { + if (auto tasksNode = port_node[schema_.max_concurrent_tasks]) { std::string rawMaxConcurrentTasks = tasksNode.getIntegerAsString().value(); int32_t maxConcurrentTasks; if (core::Property::StringToInt(rawMaxConcurrentTasks, maxConcurrentTasks)) { @@ -658,7 +666,7 @@ PropertyValue StructuredConfiguration::getValidatedProcessorPropertyForDefaultTy } else if (defaultType == Value::BOOL_TYPE && property_value_node.getBool()) { coercedValue = property_value_node.getBool().value(); } else { - coercedValue = property_value_node.getString().value(); + coercedValue = property_value_node.getScalarAsString().value(); } return coercedValue; } catch (const std::exception& e) { @@ -687,7 +695,7 @@ void StructuredConfiguration::parseSingleProperty(const std::string& property_na throw; } if (!property_set) { - const auto rawValueString = property_value_node.getString().value(); + const auto rawValueString = property_value_node.getScalarAsString().value(); auto proc = dynamic_cast<core::Connectable*>(&processor); if (proc) { logger_->log_warn("Received property %s with value %s but is not one of the properties for %s. Attempting to add as dynamic property.", property_name, rawValueString, proc->getName()); @@ -714,7 +722,7 @@ void StructuredConfiguration::parsePropertyNodeElement(const std::string& proper } } -void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name, const std::string& section) { +void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, core::ConfigurableComponent& component, const std::string& component_name) { // Treat generically as a node so we can perform inspection on entries to ensure they are populated logger_->log_trace("Entered %s", component_name); for (const auto& property_node : properties_node) { @@ -723,7 +731,7 @@ void StructuredConfiguration::parsePropertiesNode(const Node& properties_node, c parsePropertyNodeElement(propertyName, propertyValueNode, component); } - validateComponentProperties(component, component_name, section); + validateComponentProperties(component, component_name, properties_node.getPath()); } void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup* parent) { @@ -739,7 +747,7 @@ void StructuredConfiguration::parseFunnels(const Node& node, core::ProcessGroup* std::string id = getOrGenerateId(funnel_node); // Default name to be same as ID - const auto name = funnel_node["name"].getString().value_or(id); + const auto name = funnel_node[schema_.name].getString().value_or(id); const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] { logger_->log_debug("Incorrect funnel UUID format."); @@ -767,7 +775,7 @@ void StructuredConfiguration::parsePorts(const flow::Node& node, core::ProcessGr std::string id = getOrGenerateId(port_node); // Default name to be same as ID - const auto name = port_node["name"].getString().value_or(id); + const auto name = port_node[schema_.name].getString().value_or(id); const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] { logger_->log_debug("Incorrect port UUID format."); @@ -861,14 +869,14 @@ void StructuredConfiguration::raiseComponentError(const std::string &component_n throw std::invalid_argument(err_msg); } -std::string StructuredConfiguration::getOrGenerateId(const Node& node, const std::string& id_field) { - if (node[id_field]) { - if (auto opt_id_str = node[id_field].getString()) { +std::string StructuredConfiguration::getOrGenerateId(const Node& node) { + if (node[schema_.identifier]) { + if (auto opt_id_str = node[schema_.identifier].getString()) { auto id = opt_id_str.value(); addNewId(id); return id; } - throw std::invalid_argument("getOrGenerateId: idField '" + id_field + "' is expected to contain string."); + throw std::invalid_argument("getOrGenerateId: idField '" + utils::StringUtils::join(",", schema_.identifier) + "' is expected to contain string."); } auto id = id_generator_->generate().to_string(); @@ -876,27 +884,24 @@ std::string StructuredConfiguration::getOrGenerateId(const Node& node, const std return id; } -std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::string_view section, std::string_view error_message) { - checkRequiredField(node, "id", section, error_message); - auto id = node["id"].getString().value(); +std::string StructuredConfiguration::getRequiredIdField(const Node& node, std::string_view error_message) { + checkRequiredField(node, schema_.identifier, error_message); + auto id = node[schema_.identifier].getString().value(); addNewId(id); return id; } -std::string StructuredConfiguration::getOptionalField(const Node& node, const std::string& field_name, const std::string& default_value, const std::string& section, - const std::string& info_message) { +std::string StructuredConfiguration::getOptionalField(const Node& node, const std::vector<std::string>& field_name, const std::string& default_value, const std::string& info_message) { std::string infoMessage = info_message; auto result = node[field_name]; if (!result) { if (infoMessage.empty()) { // Build a helpful info message for the user to inform them that a default is being used - infoMessage = - node["name"] ? - "Using default value for optional field '" + field_name + "' in component named '" + node["name"].getString().value() + "'" : - "Using default value for optional field '" + field_name + "' "; - if (!section.empty()) { - infoMessage += " [in '" + section + "' section of configuration file]: "; + infoMessage = "Using default value for optional field '" + utils::StringUtils::join(",", field_name) + "'"; + if (auto name = node["name"]) { + infoMessage += "' in component named '" + name.getString().value() + "'"; } + infoMessage += " [in '" + node.getPath() + "' section of configuration file]: "; infoMessage += default_value; } diff --git a/libminifi/src/core/flow/StructuredConnectionParser.cpp b/libminifi/src/core/flow/StructuredConnectionParser.cpp index f39a3ac8b..a6521884b 100644 --- a/libminifi/src/core/flow/StructuredConnectionParser.cpp +++ b/libminifi/src/core/flow/StructuredConnectionParser.cpp @@ -49,10 +49,9 @@ void StructuredConnectionParser::addFunnelRelationshipToConnection(minifi::Conne void StructuredConnectionParser::configureConnectionSourceRelationships(minifi::Connection& connection) const { // Configure connection source - if (connectionNode_["source relationship name"] && !connectionNode_["source relationship name"].getString().value().empty()) { - addNewRelationshipToConnection(connectionNode_["source relationship name"].getString().value(), connection); - } else if (connectionNode_["source relationship names"]) { - auto relList = connectionNode_["source relationship names"]; + if (connectionNode_[schema_.source_relationship] && !connectionNode_[schema_.source_relationship].getString().value().empty()) { + addNewRelationshipToConnection(connectionNode_[schema_.source_relationship].getString().value(), connection); + } else if (auto relList = connectionNode_[schema_.source_relationship_list]) { if (relList.isSequence() && !relList.empty()) { for (const auto &rel : relList) { addNewRelationshipToConnection(rel.getString().value(), connection); @@ -68,7 +67,7 @@ void StructuredConnectionParser::configureConnectionSourceRelationships(minifi:: } uint64_t StructuredConnectionParser::getWorkQueueSize() const { - if (auto max_work_queue_data_size_node = connectionNode_["max work queue size"]) { + if (auto max_work_queue_data_size_node = connectionNode_[schema_.max_queue_size]) { std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value(); uint64_t max_work_queue_size; if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) { @@ -81,7 +80,7 @@ uint64_t StructuredConnectionParser::getWorkQueueSize() const { } uint64_t StructuredConnectionParser::getWorkQueueDataSize() const { - const flow::Node max_work_queue_data_size_node = connectionNode_["max work queue data size"]; + const flow::Node max_work_queue_data_size_node = connectionNode_[schema_.max_queue_data_size]; if (max_work_queue_data_size_node) { std::string max_work_queue_str = max_work_queue_data_size_node.getIntegerAsString().value(); uint64_t max_work_queue_data_size = 0; @@ -95,7 +94,7 @@ uint64_t StructuredConnectionParser::getWorkQueueDataSize() const { } uint64_t StructuredConnectionParser::getSwapThreshold() const { - const flow::Node swap_threshold_node = connectionNode_["swap threshold"]; + const flow::Node swap_threshold_node = connectionNode_[schema_.swap_threshold]; if (swap_threshold_node) { auto swap_threshold_str = swap_threshold_node.getString().value(); uint64_t swap_threshold; @@ -109,7 +108,7 @@ uint64_t StructuredConnectionParser::getSwapThreshold() const { } utils::Identifier StructuredConnectionParser::getSourceUUID() const { - const flow::Node source_id_node = connectionNode_["source id"]; + const flow::Node source_id_node = connectionNode_[schema_.source_id]; if (source_id_node) { const auto srcUUID = utils::Identifier::parse(source_id_node.getString().value()); if (srcUUID) { @@ -120,8 +119,8 @@ utils::Identifier StructuredConnectionParser::getSourceUUID() const { throw std::invalid_argument("Invalid source id"); } // if we don't have a source id, try to resolve using source name. config schema v2 will make this unnecessary - checkRequiredField(connectionNode_, "source name", CONFIG_CONNECTIONS_KEY); - const auto connectionSrcProcName = connectionNode_["source name"].getString().value(); + checkRequiredField(connectionNode_, schema_.source_name); + const auto connectionSrcProcName = connectionNode_[schema_.source_name].getString().value(); const auto srcUUID = utils::Identifier::parse(connectionSrcProcName); if (srcUUID && parent_->findProcessorById(srcUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) { // the source name is a remote port id, so use that as the source id @@ -141,7 +140,7 @@ utils::Identifier StructuredConnectionParser::getSourceUUID() const { } utils::Identifier StructuredConnectionParser::getDestinationUUID() const { - const flow::Node destination_id_node = connectionNode_["destination id"]; + const flow::Node destination_id_node = connectionNode_[schema_.destination_id]; if (destination_id_node) { const auto destUUID = utils::Identifier::parse(destination_id_node.getString().value()); if (destUUID) { @@ -153,8 +152,8 @@ utils::Identifier StructuredConnectionParser::getDestinationUUID() const { } // we use the same logic as above for resolving the source processor // for looking up the destination processor in absence of a processor id - checkRequiredField(connectionNode_, "destination name", CONFIG_CONNECTIONS_KEY); - auto connectionDestProcName = connectionNode_["destination name"].getString().value(); + checkRequiredField(connectionNode_, schema_.destination_name); + auto connectionDestProcName = connectionNode_[schema_.destination_name].getString().value(); const auto destUUID = utils::Identifier::parse(connectionDestProcName); if (destUUID && parent_->findProcessorById(destUUID.value(), ProcessGroup::Traverse::ExcludeChildren)) { // the destination name is a remote port id, so use that as the dest id @@ -175,7 +174,7 @@ utils::Identifier StructuredConnectionParser::getDestinationUUID() const { std::chrono::milliseconds StructuredConnectionParser::getFlowFileExpiration() const { using namespace std::literals::chrono_literals; - const flow::Node expiration_node = connectionNode_["flowfile expiration"]; + const flow::Node expiration_node = connectionNode_[schema_.flowfile_expiration]; if (!expiration_node) { logger_->log_debug("parseConnection: flowfile expiration is not set, assuming 0 (never expire)"); return 0ms; @@ -196,7 +195,7 @@ std::chrono::milliseconds StructuredConnectionParser::getFlowFileExpiration() co } bool StructuredConnectionParser::getDropEmpty() const { - const flow::Node drop_empty_node = connectionNode_["drop empty"]; + const flow::Node drop_empty_node = connectionNode_[schema_.drop_empty]; if (drop_empty_node) { return utils::StringUtils::toBool(drop_empty_node.getString().value()).value_or(false); } diff --git a/libminifi/src/core/json/JsonConfiguration.cpp b/libminifi/src/core/json/JsonConfiguration.cpp deleted file mode 100644 index 960f9a631..000000000 --- a/libminifi/src/core/json/JsonConfiguration.cpp +++ /dev/null @@ -1,89 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <memory> -#include <vector> -#include <set> -#include <cinttypes> -#include <variant> - -#include "core/json/JsonConfiguration.h" -#include "core/json/JsonNode.h" -#include "core/state/Value.h" -#include "Defaults.h" -#include "utils/TimeUtil.h" - -#include "rapidjson/rapidjson.h" -#include "rapidjson/document.h" - -namespace org::apache::nifi::minifi::core { - -namespace { - -} // namespace - - -JsonConfiguration::JsonConfiguration(ConfigurationContext ctx) - : StructuredConfiguration(([&] { - if (!ctx.path) { - ctx.path = DEFAULT_NIFI_CONFIG_JSON; - } - return std::move(ctx); - })(), - logging::LoggerFactory<JsonConfiguration>::getLogger()) {} - -std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRoot() { - if (!config_path_) { - logger_->log_error("Cannot instantiate flow, no config file is set."); - throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified"); - } - const auto configuration = filesystem_->read(config_path_.value()); - if (!configuration) { - // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network - return nullptr; - } - try { - rapidjson::Document doc; - rapidjson::ParseResult res = doc.Parse(configuration->c_str(), configuration->length()); - if (!res) { - throw std::runtime_error("Could not parse json file"); - } - flow::Node root{std::make_shared<JsonNode>(&doc)}; - return getRootFrom(root); - } catch(...) { - logger_->log_error("Invalid json configuration file"); - throw; - } -} - -std::unique_ptr<core::ProcessGroup> JsonConfiguration::getRootFromPayload(const std::string &json_config) { - try { - rapidjson::Document doc; - rapidjson::ParseResult res = doc.Parse(json_config.c_str(), json_config.length()); - if (!res) { - throw std::runtime_error("Could not parse json file"); - } - flow::Node root{std::make_shared<JsonNode>(&doc)}; - return getRootFrom(root); - } catch (const std::runtime_error& err) { - logger_->log_error(err.what()); - throw; - } -} - -} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index 89092eee5..f7bde6a66 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -40,31 +40,11 @@ YamlConfiguration::YamlConfiguration(ConfigurationContext ctx) })(), logging::LoggerFactory<YamlConfiguration>::getLogger()) {} -std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRoot() { - if (!config_path_) { - logger_->log_error("Cannot instantiate flow, no config file is set."); - throw Exception(ExceptionType::FLOW_EXCEPTION, "No config file specified"); - } - const auto configuration = filesystem_->read(config_path_.value()); - if (!configuration) { - // non-existence of flow config file is not a dealbreaker, the caller might fetch it from network - return nullptr; - } - try { - YAML::Node rootYamlNode = YAML::Load(configuration.value()); - flow::Node root{std::make_shared<YamlNode>(rootYamlNode)}; - return getRootFrom(root); - } catch(...) { - logger_->log_error("Invalid yaml configuration file"); - throw; - } -} - std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(std::istream &yamlConfigStream) { try { YAML::Node rootYamlNode = YAML::Load(yamlConfigStream); flow::Node root{std::make_shared<YamlNode>(rootYamlNode)}; - return getRootFrom(root); + return getRootFrom(root, flow::FlowSchema::getDefault()); } catch (const YAML::ParserException &pe) { logger_->log_error(pe.what()); throw; @@ -75,7 +55,7 @@ std::unique_ptr<core::ProcessGroup> YamlConfiguration::getRootFromPayload(const try { YAML::Node rootYamlNode = YAML::Load(yamlConfigPayload); flow::Node root{std::make_shared<YamlNode>(rootYamlNode)}; - return getRootFrom(root); + return getRootFrom(root, flow::FlowSchema::getDefault()); } catch (const YAML::ParserException &pe) { logger_->log_error(pe.what()); throw; diff --git a/libminifi/test/ConfigurationTestController.h b/libminifi/test/ConfigurationTestController.h new file mode 100644 index 000000000..05e33214d --- /dev/null +++ b/libminifi/test/ConfigurationTestController.h @@ -0,0 +1,56 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> + +#include "TestBase.h" +#include "core/FlowConfiguration.h" +#include "core/RepositoryFactory.h" +#include "core/yaml/YamlConfiguration.h" +#include "core/flow/AdaptiveConfiguration.h" + +class ConfigurationTestController : public TestController { + public: + ConfigurationTestController() { + flow_file_repo_ = core::createRepository("flowfilerepository"); + configuration_ = std::make_shared<minifi::Configure>(); + stream_factory_ = minifi::io::StreamFactory::getInstance(configuration_); + content_repo_ = std::make_shared<core::repository::VolatileContentRepository>(); + + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setTrace<core::YamlConfiguration>(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setTrace<core::flow::AdaptiveConfiguration>(); + } + + core::ConfigurationContext getContext() const { + return core::ConfigurationContext{ + .flow_file_repo = flow_file_repo_, + .content_repo = content_repo_, + .stream_factory = stream_factory_, + .configuration = configuration_ + }; + } + + std::shared_ptr<core::Repository> flow_file_repo_; + std::shared_ptr<minifi::Configure> configuration_; + std::shared_ptr<minifi::io::StreamFactory> stream_factory_; + std::shared_ptr<core::ContentRepository> content_repo_; +}; diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index 63ee396b2..a3ed6a483 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -232,7 +232,7 @@ int main(int argc, char **argv) { std::string graceful_shutdown_seconds; std::string prov_repo_class = "provenancerepository"; std::string flow_repo_class = "flowfilerepository"; - std::string nifi_configuration_class_name = "yamlconfiguration"; + std::string nifi_configuration_class_name = "adaptiveconfiguration"; std::string content_repo_class = "filesystemrepository"; auto log_properties = std::make_shared<core::logging::LoggerProperties>();
