This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 960b6dc MINIFICPP-1325 - Refactor and test YAML connection parsing
960b6dc is described below
commit 960b6dc0cf66c9cb28d97a90e4d9dd920a1af196
Author: Adam Hunyadi <[email protected]>
AuthorDate: Fri Feb 26 12:06:48 2021 +0100
MINIFICPP-1325 - Refactor and test YAML connection parsing
Signed-off-by: Adam Debreceni <[email protected]>
This closes #948
---
.../tests/unit/YamlConfigurationTests.cpp | 203 +++++++--------
.../tests/unit/YamlConnectionParserTest.cpp | 198 +++++++++++++++
libminifi/include/core/FlowFile.h | 2 +-
libminifi/include/core/ProcessGroup.h | 2 +-
libminifi/include/core/yaml/CheckRequiredField.h | 59 +++++
libminifi/include/core/yaml/YamlConfiguration.h | 57 +----
libminifi/include/core/yaml/YamlConnectionParser.h | 66 +++++
libminifi/include/utils/Id.h | 1 -
libminifi/include/utils/TestUtils.h | 8 +
libminifi/src/core/ProcessGroup.cpp | 8 +-
libminifi/src/core/yaml/CheckRequiredField.cpp | 59 +++++
libminifi/src/core/yaml/YamlConfiguration.cpp | 276 ++++++---------------
libminifi/src/core/yaml/YamlConnectionParser.cpp | 182 ++++++++++++++
main/MainHelper.h | 7 -
14 files changed, 764 insertions(+), 364 deletions(-)
diff --git
a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index c9ce42c..157fd03 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -19,9 +19,12 @@
#include <map>
#include <memory>
#include "core/repository/VolatileContentRepository.h"
+#include "core/ProcessGroup.h"
#include "core/RepositoryFactory.h"
#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
#include "TestBase.h"
+#include "utils/TestUtils.h"
TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
TestController test_controller;
@@ -35,105 +38,105 @@ TEST_CASE("Test YAML Config Processing",
"[YamlConfiguration]") {
SECTION("loading YAML without optional component IDs works") {
static const std::string CONFIG_YAML_WITHOUT_IDS = ""
- "MiNiFi Config Version: 1\n"
- "Flow Controller:\n"
- " name: MiNiFi Flow\n"
- " comment:\n"
- "\n"
- "Core Properties:\n"
- " flow controller graceful shutdown period: 10 sec\n"
- " flow service write delay interval: 500 ms\n"
- " administrative yield duration: 30 sec\n"
- " bored yield duration: 10 millis\n"
- "\n"
- "FlowFile Repository:\n"
- " partitions: 256\n"
- " checkpoint interval: 2 mins\n"
- " always sync: false\n"
- " Swap:\n"
- " threshold: 20000\n"
- " in period: 5 sec\n"
- " in threads: 1\n"
- " out period: 5 sec\n"
- " out threads: 4\n"
- "\n"
- "Provenance Repository:\n"
- " provenance rollover time: 1 min\n"
- "\n"
- "Content Repository:\n"
- " content claim max appendable size: 10 MB\n"
- " content claim max flow files: 100\n"
- " always sync: false\n"
- "\n"
- "Component Status Repository:\n"
- " buffer size: 1440\n"
- " snapshot frequency: 1 min\n"
- "\n"
- "Security Properties:\n"
- " keystore: /tmp/ssl/localhost-ks.jks\n"
- " keystore type: JKS\n"
- " keystore password: localtest\n"
- " key password: localtest\n"
- " truststore: /tmp/ssl/localhost-ts.jks\n"
- " truststore type: JKS\n"
- " truststore password: localtest\n"
- " ssl protocol: TLS\n"
- " Sensitive Props:\n"
- " key:\n"
- " algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL\n"
- " provider: BC\n"
- "\n"
- "Processors:\n"
- " - name: TailFile\n"
- " class: org.apache.nifi.processors.standard.TailFile\n"
- " max concurrent tasks: 1\n"
- " scheduling strategy: TIMER_DRIVEN\n"
- " scheduling period: 1 sec\n"
- " penalization period: 30 sec\n"
- " yield period: 1 sec\n"
- " run duration nanos: 0\n"
- " auto-terminated relationships list:\n"
- " Properties:\n"
- " File to Tail: logs/minifi-app.log\n"
- " Rolling Filename Pattern: minifi-app*\n"
- " Initial Start Position: Beginning of File\n"
- "\n"
- "Connections:\n"
- " - name: TailToS2S\n"
- " source name: TailFile\n"
- " source relationship name: success\n"
- " destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
- " max work queue size: 0\n"
- " max work queue data size: 1 MB\n"
- " flowfile expiration: 60 sec\n"
- " queue prioritizer class:
org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer\n"
- "\n"
- "Remote Processing Groups:\n"
- " - name: NiFi Flow\n"
- " comment:\n"
- " url: https://localhost:8090/nifi\n"
- " timeout: 30 secs\n"
- " yield period: 10 sec\n"
- " Input Ports:\n"
- " - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
- " name: tailed log\n"
- " comments:\n"
- " max concurrent tasks: 1\n"
- " use compression: false\n"
- "\n"
- "Provenance Reporting:\n"
- " comment:\n"
- " scheduling strategy: TIMER_DRIVEN\n"
- " scheduling period: 30 sec\n"
- " host: localhost\n"
- " port name: provenance\n"
- " port: 8090\n"
- " port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n"
- " url: https://localhost:8090/\n"
- " originating url: http://${hostname(true)}:8081/nifi\n"
- " use compression: true\n"
- " timeout: 30 secs\n"
- " batch size: 1000";
+ "MiNiFi Config Version: 1\n"
+ "Flow Controller:\n"
+ " name: MiNiFi Flow\n"
+ " comment:\n"
+ "\n"
+ "Core Properties:\n"
+ " flow controller graceful shutdown period: 10 sec\n"
+ " flow service write delay interval: 500 ms\n"
+ " administrative yield duration: 30 sec\n"
+ " bored yield duration: 10 millis\n"
+ "\n"
+ "FlowFile Repository:\n"
+ " partitions: 256\n"
+ " checkpoint interval: 2 mins\n"
+ " always sync: false\n"
+ " Swap:\n"
+ " threshold: 20000\n"
+ " in period: 5 sec\n"
+ " in threads: 1\n"
+ " out period: 5 sec\n"
+ " out threads: 4\n"
+ "\n"
+ "Provenance Repository:\n"
+ " provenance rollover time: 1 min\n"
+ "\n"
+ "Content Repository:\n"
+ " content claim max appendable size: 10 MB\n"
+ " content claim max flow files: 100\n"
+ " always sync: false\n"
+ "\n"
+ "Component Status Repository:\n"
+ " buffer size: 1440\n"
+ " snapshot frequency: 1 min\n"
+ "\n"
+ "Security Properties:\n"
+ " keystore: /tmp/ssl/localhost-ks.jks\n"
+ " keystore type: JKS\n"
+ " keystore password: localtest\n"
+ " key password: localtest\n"
+ " truststore: /tmp/ssl/localhost-ts.jks\n"
+ " truststore type: JKS\n"
+ " truststore password: localtest\n"
+ " ssl protocol: TLS\n"
+ " Sensitive Props:\n"
+ " key:\n"
+ " algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL\n"
+ " provider: BC\n"
+ "\n"
+ "Processors:\n"
+ " - name: TailFile\n"
+ " class: org.apache.nifi.processors.standard.TailFile\n"
+ " max concurrent tasks: 1\n"
+ " scheduling strategy: TIMER_DRIVEN\n"
+ " scheduling period: 1 sec\n"
+ " penalization period: 30 sec\n"
+ " yield period: 1 sec\n"
+ " run duration nanos: 0\n"
+ " auto-terminated relationships list:\n"
+ " Properties:\n"
+ " File to Tail: logs/minifi-app.log\n"
+ " Rolling Filename Pattern: minifi-app*\n"
+ " Initial Start Position: Beginning of File\n"
+ "\n"
+ "Connections:\n"
+ " - name: TailToS2S\n"
+ " source name: TailFile\n"
+ " source relationship name: success\n"
+ " destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
+ " max work queue size: 0\n"
+ " max work queue data size: 1 MB\n"
+ " flowfile expiration: 60 sec\n"
+ " queue prioritizer class:
org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer\n"
+ "\n"
+ "Remote Processing Groups:\n"
+ " - name: NiFi Flow\n"
+ " comment:\n"
+ " url: https://localhost:8090/nifi\n"
+ " timeout: 30 secs\n"
+ " yield period: 10 sec\n"
+ " Input Ports:\n"
+ " - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
+ " name: tailed log\n"
+ " comments:\n"
+ " max concurrent tasks: 1\n"
+ " use compression: false\n"
+ "\n"
+ "Provenance Reporting:\n"
+ " comment:\n"
+ " scheduling strategy: TIMER_DRIVEN\n"
+ " scheduling period: 30 sec\n"
+ " host: localhost\n"
+ " port name: provenance\n"
+ " port: 8090\n"
+ " port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n"
+ " url: https://localhost:8090/\n"
+ " originating url: http://${hostname(true)}:8081/nifi\n"
+ " use compression: true\n"
+ " timeout: 30 secs\n"
+ " batch size: 1000";
std::istringstream configYamlStream(CONFIG_YAML_WITHOUT_IDS);
std::unique_ptr<core::ProcessGroup> rootFlowConfig =
yamlConfig.getYamlRoot(configYamlStream);
@@ -494,7 +497,7 @@ Processors:
REQUIRE(rootFlowConfig);
REQUIRE(rootFlowConfig->findProcessorByName("PutFile"));
- utils::Identifier uuid =
rootFlowConfig->findProcessorByName("PutFile")->getUUID();
+ const utils::Identifier uuid =
rootFlowConfig->findProcessorByName("PutFile")->getUUID();
REQUIRE(uuid);
REQUIRE(!rootFlowConfig->findProcessorByName("PutFile")->getUUIDStr().empty());
diff --git
a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
new file mode 100644
index 0000000..5925bb7
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
@@ -0,0 +1,198 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+
+#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
+#include "TestBase.h"
+#include "utils/TestUtils.h"
+
+namespace {
+
+using org::apache::nifi::minifi::core::yaml::YamlConnectionParser;
+using org::apache::nifi::minifi::core::YamlConfiguration;
+using RetryFlowFile = org::apache::nifi::minifi::processors::TailFile;
+
+TEST_CASE("Connections components are parsed from yaml",
"[YamlConfiguration]") {
+ const std::shared_ptr<logging::Logger> logger =
logging::LoggerFactory<YamlConfiguration>::getLogger();
+ core::ProcessGroup parent(core::ProcessGroupType::ROOT_PROCESS_GROUP,
"root");
+ gsl::not_null<core::ProcessGroup*> parent_ptr{ &parent };
+
+ SECTION("Source relationships are read") {
+ const auto connection = std::make_shared<minifi::Connection>(nullptr,
nullptr, "name");
+ std::string serialized_yaml;
+ std::set<org::apache::nifi::minifi::core::Relationship> expectations;
+ SECTION("Single relationship name") {
+ serialized_yaml = std::string { "source relationship name: success\n" };
+ expectations = { { "success", "" } };
+ }
+ SECTION("List of relationship names") {
+ serialized_yaml = std::string {
+ "source relationship names:\n"
+ "- success\n"
+ "- failure\n"
+ "- something_else\n" };
+ expectations = { { "success", "" }, { "failure", "" }, {
"something_else", "" } };
+ }
+ YAML::Node connection_node = YAML::Load(serialized_yaml);
+ YamlConnectionParser yaml_connection_parser(connection_node, "test_node",
parent_ptr, logger);
+
yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection);
+ const std::set<core::Relationship>& relationships =
connection->getRelationships();
+ REQUIRE(expectations == relationships);
+ }
+ SECTION("Queue size limits are read") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "max work queue size: 231\n"
+ "max work queue data size: 12 MB\n" });
+ YamlConnectionParser yaml_connection_parser(connection_node, "test_node",
parent_ptr, logger);
+ REQUIRE(231 == yaml_connection_parser.getWorkQueueSizeFromYaml());
+ REQUIRE(12582912 ==
yaml_connection_parser.getWorkQueueDataSizeFromYaml()); // 12 * 1024 * 1024 B
+ }
+ SECTION("Source and destination names and uuids are read") {
+ const utils::Identifier expected_source_id = utils::generateUUID();
+ const utils::Identifier expected_destination_id = utils::generateUUID();
+ std::string serialized_yaml;
+
parent.addProcessor(std::static_pointer_cast<core::Processor>(std::make_shared<processors::TailFile>("TailFile_1",
expected_source_id)));
+
parent.addProcessor(std::static_pointer_cast<core::Processor>(std::make_shared<processors::TailFile>("TailFile_2",
expected_destination_id)));
+ SECTION("Directly from configuration") {
+ serialized_yaml = std::string {
+ "source id: " + expected_source_id.to_string() + "\n"
+ "destination id: " + expected_destination_id.to_string() + "\n" };
+ }
+ SECTION("Using UUID as remote processing group id") {
+ serialized_yaml = std::string {
+ "source name: " + expected_source_id.to_string() + "\n"
+ "destination name: " + expected_destination_id.to_string() + "\n" };
+ }
+ SECTION("Via processor name lookup") {
+ serialized_yaml = std::string {
+ "source name: TailFile_1\n"
+ "destination name: TailFile_2\n" };
+ }
+ YAML::Node connection_node = YAML::Load(serialized_yaml);
+ YamlConnectionParser yaml_connection_parser(connection_node, "test_node",
parent_ptr, logger);
+ REQUIRE(expected_source_id ==
yaml_connection_parser.getSourceUUIDFromYaml());
+ REQUIRE(expected_destination_id ==
yaml_connection_parser.getDestinationUUIDFromYaml());
+ }
+ SECTION("Flow file expiration is read") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "flowfile expiration: 2 min\n" });
+ YamlConnectionParser yaml_connection_parser(connection_node, "test_node",
parent_ptr, logger);
+ REQUIRE(120000 == yaml_connection_parser.getFlowFileExpirationFromYaml());
// 2 * 60 * 1000 ms
+ }
+ SECTION("Drop empty value is read") {
+ SECTION("When config contains true value") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "drop empty: true\n" });
+ YamlConnectionParser yaml_connection_parser(connection_node,
"test_node", parent_ptr, logger);
+ REQUIRE(true == yaml_connection_parser.getDropEmptyFromYaml());
+ }
+ SECTION("When config contains false value") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "drop empty: false\n" });
+ YamlConnectionParser yaml_connection_parser(connection_node,
"test_node", parent_ptr, logger);
+ REQUIRE(false == yaml_connection_parser.getDropEmptyFromYaml());
+ }
+ }
+ SECTION("Errors are handled properly when configuration lines are missing") {
+ const auto connection = std::make_shared<minifi::Connection>(nullptr,
nullptr, "name");
+ SECTION("With empty configuration") {
+ YAML::Node connection_node = YAML::Load(std::string(""));
+ YamlConnectionParser yaml_connection_parser(connection_node,
"test_node", parent_ptr, logger);
+
CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+ CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
+ CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+ CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+ CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+ CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
+ CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());
+ }
+ SECTION("With a configuration that lists keys but has no assigned values")
{
+ std::string serialized_yaml;
+ SECTION("Single relationship name left empty") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "source name: \n"
+ "destination name: \n" });
+ YamlConnectionParser yaml_connection_parser(connection_node,
"test_node", parent_ptr, logger);
+ // This seems incorrect, but we do not want to ruin backward
compatibility
+
CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+ }
+ SECTION("List of relationship names contains empty item") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "source relationship names:\n"
+ "- \n" });
+ YamlConnectionParser yaml_connection_parser(connection_node,
"test_node", parent_ptr, logger);
+
CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+ }
+ SECTION("Source and destination lookup from via id") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "source id: \n"
+ "destination id: \n" });
+ YamlConnectionParser yaml_connection_parser(connection_node,
"test_node", parent_ptr, logger);
+ CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+ CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+ }
+ SECTION("Source and destination lookup via name") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "source name: \n"
+ "destination name: \n" });
+ YamlConnectionParser yaml_connection_parser(connection_node,
"test_node", parent_ptr, logger);
+ CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+ CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+ }
+ SECTION("Queue limits and configuration") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "max work queue size: \n"
+ "max work queue data size: \n"
+ "flowfile expiration: \n"
+ "drop empty: \n"});
+ YamlConnectionParser yaml_connection_parser(connection_node,
"test_node", parent_ptr, logger);
+ CHECK_THROWS(yaml_connection_parser.getWorkQueueSizeFromYaml());
+ CHECK_THROWS(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+ CHECK_THROWS(yaml_connection_parser.getFlowFileExpirationFromYaml());
+ CHECK_THROWS(yaml_connection_parser.getDropEmptyFromYaml());
+ }
+ }
+ SECTION("With a configuration that has values of incorrect format") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "max work queue size: 2 KB\n"
+ "max work queue data size: 10 Incorrect\n"
+ "flowfile expiration: 12\n"
+ "drop empty: sup\n"});
+ YamlConnectionParser yaml_connection_parser(connection_node,
"test_node", parent_ptr, logger);
+ // This seems incorrect, but we do not want to ruin backward
compatibility
+ CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
+ CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+ CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
+ CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());
+ }
+ SECTION("Known incorrect formats that behave strangely") {
+ YAML::Node connection_node = YAML::Load(std::string {
+ "max work queue data size: 2 Baby Pandas (img, 20 MB) that are cared
for by a group of 30 giraffes\n"
+ "flowfile expiration: 0\n"
+ "drop empty: NULL\n"});
+ YamlConnectionParser yaml_connection_parser(connection_node,
"test_node", parent_ptr, logger);
+ REQUIRE(2 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+ REQUIRE(0 == yaml_connection_parser.getFlowFileExpirationFromYaml());
+ CHECK_THROWS(yaml_connection_parser.getDropEmptyFromYaml());
+ }
+ }
+}
+
+} // namespace
diff --git a/libminifi/include/core/FlowFile.h
b/libminifi/include/core/FlowFile.h
index d2924bc..ca643ad 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -186,7 +186,7 @@ class FlowFile : public CoreComponent, public
ReferenceContainer {
/**
* Set the size of this record.
- * @param size size of record to set.Ï
+ * @param size size of record to set.
*/
void setSize(const uint64_t size) {
size_ = size;
diff --git a/libminifi/include/core/ProcessGroup.h
b/libminifi/include/core/ProcessGroup.h
index bb98c37..fad30a7 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -206,7 +206,7 @@ class ProcessGroup : public CoreComponent {
* @param nodeId node identifier
* @param node controller service node.
*/
- void addControllerService(const std::string &nodeId,
std::shared_ptr<core::controller::ControllerServiceNode> &node);
+ void addControllerService(const std::string &nodeId, const
std::shared_ptr<core::controller::ControllerServiceNode> &node);
/**
* Find controllerservice node will search child groups until the nodeId is
found.
diff --git a/libminifi/include/core/yaml/CheckRequiredField.h
b/libminifi/include/core/yaml/CheckRequiredField.h
new file mode 100644
index 0000000..7029166
--- /dev/null
+++ b/libminifi/include/core/yaml/CheckRequiredField.h
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "core/logging/LoggerConfiguration.h"
+#include "yaml-cpp/yaml.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+/**
+ * This is a helper function for verifying the existence of a required
+ * field in a YAML::Node object. If the field is not present, an error
+ * message will be logged and an std::invalid_argument exception will be
+ * thrown indicating the absence of the required field in the YAML node.
+ *
+ * @param yamlNode the YAML node to check
+ * @param fieldName the required field key
+ * @param yamlSection [optional] the top level section of the YAML config
+ * for the yamlNode. This is used for generating a
+ * useful error message for troubleshooting.
+ * @param errorMessage [optional] the error message string to use if
+ * the required field is missing. If not provided,
+ * a default error message will be generated.
+ *
+ * @throws std::invalid_argument if the required field 'fieldName' is
+ * not present in 'yamlNode'
+ */
+void checkRequiredField(
+ const YAML::Node *yamlNode, const std::string &fieldName, const
std::shared_ptr<logging::Logger>& logger, const std::string &yamlSection = "",
const std::string &errorMessage = "");
+
+} // namespace yaml
+} // namespace core
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h
b/libminifi/include/core/yaml/YamlConfiguration.h
index 83aa204..8342ccd 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -34,6 +34,8 @@
#include "utils/OptionalUtils.h"
#include "yaml-cpp/yaml.h"
+class YamlConfigurationTestAccessor;
+
namespace org {
namespace apache {
namespace nifi {
@@ -42,7 +44,6 @@ namespace core {
#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
#define CONFIG_YAML_PROCESSORS_KEY "Processors"
-#define CONFIG_YAML_CONNECTIONS_KEY "Connections"
#define CONFIG_YAML_CONTROLLER_SERVICES_KEY "Controller Services"
#define CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY "Remote Processing Groups"
#define CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3 "Remote Process Groups"
@@ -155,36 +156,7 @@ class YamlConfiguration : public FlowConfiguration {
* @return the root ProcessGroup node of the flow
* configuration tree
*/
- std::unique_ptr<core::ProcessGroup> getYamlRoot(YAML::Node *rootYamlNode) {
- YAML::Node rootYaml = *rootYamlNode;
- YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY];
- YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY];
- YAML::Node connectionsNode = rootYaml[CONFIG_YAML_CONNECTIONS_KEY];
- YAML::Node controllerServiceNode =
rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
- YAML::Node remoteProcessingGroupsNode =
rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
-
- if (!remoteProcessingGroupsNode) {
- remoteProcessingGroupsNode =
rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
- }
-
- YAML::Node provenanceReportNode =
rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
-
- parseControllerServices(&controllerServiceNode);
- // Create the root process group
- core::ProcessGroup *root = parseRootProcessGroupYaml(flowControllerNode);
- parseProcessorNodeYaml(processorsNode, root);
- parseRemoteProcessGroupYaml(&remoteProcessingGroupsNode, root);
- parseConnectionYaml(&connectionsNode, root);
- parseProvenanceReportingYaml(&provenanceReportNode, root);
-
- // set the controller services into the root group.
- for (auto controller_service :
controller_services_->getAllControllerServices()) {
- root->addControllerService(controller_service->getName(),
controller_service);
- root->addControllerService(controller_service->getUUIDStr(),
controller_service);
- }
-
- return std::unique_ptr<core::ProcessGroup>(root);
- }
+ std::unique_ptr<core::ProcessGroup> getYamlRoot(YAML::Node *rootYamlNode);
/**
* Parses a processor from its corresponding YAML config node and adds
@@ -220,7 +192,7 @@ class YamlConfiguration : public FlowConfiguration {
* @param rootNode
* @return
*/
- core::ProcessGroup *parseRootProcessGroupYaml(YAML::Node rootNode);
+ std::unique_ptr<core::ProcessGroup> parseRootProcessGroupYaml(YAML::Node
rootNode);
// Process Property YAML
void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node,
std::shared_ptr<core::Processor> processor);
@@ -230,7 +202,6 @@ class YamlConfiguration : public FlowConfiguration {
* @param parent parent process group.
*/
void parseControllerServices(YAML::Node *controllerServicesNode);
- // Process connection YAML
/**
* Parses the Connections section of a configuration YAML.
@@ -293,26 +264,6 @@ class YamlConfiguration : public FlowConfiguration {
std::string getOrGenerateId(YAML::Node *yamlNode, const std::string &idField
= "id");
/**
- * This is a helper function for verifying the existence of a required
- * field in a YAML::Node object. If the field is not present, an error
- * message will be logged and a std::invalid_argument exception will be
- * thrown indicating the absence of the required field in the YAML node.
- *
- * @param yamlNode the YAML node to check
- * @param fieldName the required field key
- * @param yamlSection [optional] the top level section of the YAML config
- * for the yamlNode. This is used fpr generating a
- * useful error message for troubleshooting.
- * @param errorMessage [optional] the error message string to use if
- * the required field is missing. If not provided,
- * a default error message will be generated.
- *
- * @throws std::invalid_argument if the required field 'fieldName' is
- * not present in 'yamlNode'
- */
- void checkRequiredField(YAML::Node *yamlNode, const std::string &fieldName,
const std::string &yamlSection = "", const std::string &errorMessage = "");
-
- /**
* This is a helper function for getting an optional value, if it exists.
* If it does not exist, returns the provided default value.
*
diff --git a/libminifi/include/core/yaml/YamlConnectionParser.h
b/libminifi/include/core/yaml/YamlConnectionParser.h
new file mode 100644
index 0000000..512b928
--- /dev/null
+++ b/libminifi/include/core/yaml/YamlConnectionParser.h
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "core/ProcessGroup.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "yaml-cpp/yaml.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+class YamlConnectionParser {
+ public:
+ static constexpr const char* CONFIG_YAML_CONNECTIONS_KEY{ "Connections" };
+
+ explicit YamlConnectionParser(const YAML::Node& connectionNode, const
std::string& name, gsl::not_null<core::ProcessGroup*> parent, const
std::shared_ptr<logging::Logger>& logger) :
+ connectionNode_(connectionNode),
+ name_(name),
+ parent_(parent),
+ logger_(logger) {}
+
+ void configureConnectionSourceRelationshipsFromYaml(const
std::shared_ptr<minifi::Connection>& connection) const;
+ uint64_t getWorkQueueSizeFromYaml() const;
+ uint64_t getWorkQueueDataSizeFromYaml() const;
+ utils::Identifier getSourceUUIDFromYaml() const;
+ utils::Identifier getDestinationUUIDFromYaml() const;
+ uint64_t getFlowFileExpirationFromYaml() const;
+ bool getDropEmptyFromYaml() const;
+ private:
+ const YAML::Node& connectionNode_;
+ const std::string& name_;
+ gsl::not_null<core::ProcessGroup*> parent_;
+ const std::shared_ptr<logging::Logger> logger_;
+};
+
+} // namespace yaml
+} // namespace core
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index e724ace..687d19f 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -54,7 +54,6 @@ namespace utils {
class Identifier {
friend struct IdentifierTestAccessor;
- static constexpr const char* UUID_FORMAT_STRING =
"%02hhx%02hhx%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx";
static constexpr const char* hex_lut = "0123456789abcdef";
public:
diff --git a/libminifi/include/utils/TestUtils.h
b/libminifi/include/utils/TestUtils.h
index f62cef8..88c8d43 100644
--- a/libminifi/include/utils/TestUtils.h
+++ b/libminifi/include/utils/TestUtils.h
@@ -19,10 +19,12 @@
#pragma once
#include <string>
+#include <memory>
#include "../../test/TestBase.h"
#include "utils/file/FileUtils.h"
#include "utils/Environment.h"
+#include "utils/Id.h"
#include "utils/TimeUtil.h"
namespace org {
@@ -66,6 +68,12 @@ std::string getFileContent(const std::string& file_name) {
return file_content;
}
+Identifier generateUUID() {
+ // TODO(hunyadi): Will make the Id generator manage lifetime using a
unique_ptr and return a raw ptr on access
+ static std::shared_ptr<utils::IdGenerator> id_generator =
utils::IdGenerator::getIdGenerator();
+ return id_generator->generate();
+}
+
class ManualClock : public timeutils::Clock {
public:
std::chrono::milliseconds timeSinceEpoch() const override { return time_; }
diff --git a/libminifi/src/core/ProcessGroup.cpp
b/libminifi/src/core/ProcessGroup.cpp
index 892cdb1..ba28246 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -272,7 +272,7 @@ std::shared_ptr<Processor>
ProcessGroup::findProcessorByName(const std::string &
return findProcessor(name_matches);
}
-void ProcessGroup::addControllerService(const std::string &nodeId,
std::shared_ptr<core::controller::ControllerServiceNode> &node) {
+void ProcessGroup::addControllerService(const std::string &nodeId, const
std::shared_ptr<core::controller::ControllerServiceNode> &node) {
controller_service_map_.put(nodeId, node);
}
@@ -352,11 +352,13 @@ void ProcessGroup::addConnection(const
std::shared_ptr<Connection>& connection)
connections_.insert(connection);
logger_->log_debug("Add connection %s into process group %s",
connection->getName(), name_);
std::shared_ptr<Processor> source =
this->findProcessorById(connection->getSourceUUID());
- if (source)
+ if (source) {
source->addConnection(connection);
+ }
std::shared_ptr<Processor> destination =
this->findProcessorById(connection->getDestinationUUID());
- if (destination && destination != source)
+ if (destination && destination != source) {
destination->addConnection(connection);
+ }
}
}
diff --git a/libminifi/src/core/yaml/CheckRequiredField.cpp
b/libminifi/src/core/yaml/CheckRequiredField.cpp
new file mode 100644
index 0000000..9ea47d7
--- /dev/null
+++ b/libminifi/src/core/yaml/CheckRequiredField.cpp
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdexcept>
+
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+void checkRequiredField(const YAML::Node *yamlNode, const std::string
&fieldName, const std::shared_ptr<logging::Logger>& logger, const std::string
&yamlSection, const std::string &errorMessage) {
+ std::string errMsg = errorMessage;
+ if (!yamlNode->as<YAML::Node>()[fieldName]) {
+ if (errMsg.empty()) {
+ const YAML::Node name_node = yamlNode->as<YAML::Node>()["name"];
+ // Build a helpful error message for the user so they can fix the
+ // invalid YAML config file, using the component name if present
+ errMsg =
+ name_node ?
+ "Unable to parse configuration file for component named '" +
name_node.as<std::string>() + "' as required field '" + fieldName + "' is
missing" :
+ "Unable to parse configuration file as required field '" +
fieldName + "' is missing";
+ if (!yamlSection.empty()) {
+ errMsg += " [in '" + yamlSection + "' section of configuration file]";
+ }
+ const YAML::Mark mark = yamlNode->Mark();
+ if (!mark.is_null()) {
+ errMsg += " [line:column, pos at " + std::to_string(mark.line) + ":" +
std::to_string(mark.column) + ", " + std::to_string(mark.pos) + "]";
+ }
+ }
+ logger->log_error(errMsg.c_str());
+ throw std::invalid_argument(errMsg);
+ }
+}
+
+} // namespace yaml
+} // namespace core
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index dbb2b4f..111dd8c 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -22,6 +22,8 @@
#include <cinttypes>
#include "core/yaml/YamlConfiguration.h"
+#include "core/yaml/CheckRequiredField.h"
+#include "core/yaml/YamlConnectionParser.h"
#include "core/state/Value.h"
#include "Defaults.h"
@@ -46,11 +48,11 @@ YamlConfiguration::YamlConfiguration(const
std::shared_ptr<core::Repository>& re
stream_factory_(stream_factory),
logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
-core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node
rootFlowNode) {
+std::unique_ptr<core::ProcessGroup>
YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
utils::Identifier uuid;
int version = 0;
- checkRequiredField(&rootFlowNode, "name",
+ yaml::checkRequiredField(&rootFlowNode, "name", logger_,
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
std::string flowName = rootFlowNode["name"].as<std::string>();
@@ -88,9 +90,40 @@ core::ProcessGroup
*YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
}
}
- return group.release();
+ return group;
}
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(YAML::Node
*rootYamlNode) {
+ YAML::Node rootYaml = *rootYamlNode;
+ YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY];
+ YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY];
+ YAML::Node connectionsNode =
rootYaml[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
+ YAML::Node controllerServiceNode =
rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
+ YAML::Node remoteProcessingGroupsNode =
rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+
+ if (!remoteProcessingGroupsNode) {
+ remoteProcessingGroupsNode =
rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+ }
+
+ YAML::Node provenanceReportNode =
rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
+
+ parseControllerServices(&controllerServiceNode);
+ // Create the root process group
+ std::unique_ptr<core::ProcessGroup> root =
parseRootProcessGroupYaml(flowControllerNode);
+ parseProcessorNodeYaml(processorsNode, root.get());
+ parseRemoteProcessGroupYaml(&remoteProcessingGroupsNode, root.get());
+ parseConnectionYaml(&connectionsNode, root.get());
+ parseProvenanceReportingYaml(&provenanceReportNode, root.get());
+
+ // set the controller services into the root group.
+ for (const auto& controller_service :
controller_services_->getAllControllerServices()) {
+ root->addControllerService(controller_service->getName(),
controller_service);
+ root->addControllerService(controller_service->getUUIDStr(),
controller_service);
+ }
+
+ return root;
+ }
+
void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode,
core::ProcessGroup *parentGroup) {
int64_t schedulingPeriod = -1;
int64_t penalizationPeriod = -1;
@@ -111,7 +144,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node
processorsNode, core::
core::ProcessorConfig procCfg;
YAML::Node procNode = iter->as<YAML::Node>();
- checkRequiredField(&procNode, "name",
+ yaml::checkRequiredField(&procNode, "name", logger_,
CONFIG_YAML_PROCESSORS_KEY);
procCfg.name = procNode["name"].as<std::string>();
procCfg.id = getOrGenerateId(&procNode);
@@ -126,7 +159,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node
processorsNode, core::
uuid = procCfg.id.c_str();
logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]",
procCfg.name, procCfg.id);
- checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
+ yaml::checkRequiredField(&procNode, "class", logger_,
CONFIG_YAML_PROCESSORS_KEY);
procCfg.javaClass = procNode["class"].as<std::string>();
logger_->log_debug("parseProcessorNode: class => [%s]",
procCfg.javaClass);
@@ -281,7 +314,7 @@ void
YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
for (YAML::const_iterator iter = rpgNode->begin(); iter !=
rpgNode->end(); ++iter) {
YAML::Node currRpgNode = iter->as<YAML::Node>();
- checkRequiredField(&currRpgNode, "name",
+ yaml::checkRequiredField(&currRpgNode, "name", logger_,
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
auto name = currRpgNode["name"].as<std::string>();
id = getOrGenerateId(&currRpgNode);
@@ -369,7 +402,7 @@ void
YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
group->setTransmitting(true);
group->setURL(url);
- checkRequiredField(&currRpgNode, "Input Ports",
+ yaml::checkRequiredField(&currRpgNode, "Input Ports", logger_,
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
if (inputPorts && inputPorts.IsSequence()) {
@@ -414,10 +447,10 @@ void
YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
YAML::Node node = reportNode->as<YAML::Node>();
- checkRequiredField(&node, "scheduling strategy",
+ yaml::checkRequiredField(&node, "scheduling strategy", logger_,
CONFIG_YAML_PROVENANCE_REPORT_KEY);
auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
- checkRequiredField(&node, "scheduling period",
+ yaml::checkRequiredField(&node, "scheduling period", logger_,
CONFIG_YAML_PROVENANCE_REPORT_KEY);
auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
@@ -453,9 +486,9 @@ void
YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
logger_->log_debug("ProvenanceReportingTask URL %s", urlStr);
}
}
- checkRequiredField(&node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+ yaml::checkRequiredField(&node, "port uuid", logger_,
CONFIG_YAML_PROVENANCE_REPORT_KEY);
auto portUUIDStr = node["port uuid"].as<std::string>();
- checkRequiredField(&node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+ yaml::checkRequiredField(&node, "batch size", logger_,
CONFIG_YAML_PROVENANCE_REPORT_KEY);
auto batchSizeStr = node["batch size"].as<std::string>();
logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
@@ -479,17 +512,17 @@ void
YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNo
for (auto iter : *controllerServicesNode) {
YAML::Node controllerServiceNode = iter.as<YAML::Node>();
try {
- checkRequiredField(&controllerServiceNode, "name",
+ yaml::checkRequiredField(&controllerServiceNode, "name", logger_,
CONFIG_YAML_CONTROLLER_SERVICES_KEY);
- checkRequiredField(&controllerServiceNode, "id",
+ yaml::checkRequiredField(&controllerServiceNode, "id", logger_,
CONFIG_YAML_CONTROLLER_SERVICES_KEY);
std::string type = "";
try {
- checkRequiredField(&controllerServiceNode, "class",
CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ yaml::checkRequiredField(&controllerServiceNode, "class", logger_,
CONFIG_YAML_CONTROLLER_SERVICES_KEY);
type = controllerServiceNode["class"].as<std::string>();
} catch (const std::invalid_argument &) {
- checkRequiredField(&controllerServiceNode, "type",
CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+ yaml::checkRequiredField(&controllerServiceNode, "type", logger_,
CONFIG_YAML_CONTROLLER_SERVICES_KEY);
type = controllerServiceNode["type"].as<std::string>();
logger_->log_debug("Using type %s for controller service node",
type);
}
@@ -530,177 +563,44 @@ void
YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNo
}
}
-void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode,
core::ProcessGroup *parent) {
+void YamlConfiguration::parseConnectionYaml(YAML::Node* connectionsNode,
core::ProcessGroup* parent) {
if (!parent) {
logger_->log_error("parseProcessNode: no parent group was provided");
return;
}
+ if (!connectionsNode || !connectionsNode->IsSequence()) {
+ return;
+ }
- if (connectionsNode) {
- if (connectionsNode->IsSequence()) {
- for (YAML::const_iterator iter = connectionsNode->begin(); iter !=
connectionsNode->end(); ++iter) {
- YAML::Node connectionNode = iter->as<YAML::Node>();
- std::shared_ptr<minifi::Connection> connection = nullptr;
-
- // Configure basic connection
- utils::Identifier uuid;
- std::string id = getOrGenerateId(&connectionNode);
-
- // Default name to be same as ID
- std::string name = id;
-
- // If name is specified in configuration, use the value
- if (connectionNode["name"]) {
- name = connectionNode["name"].as<std::string>();
- }
-
- uuid = id;
- connection = this->createConnection(name, uuid);
- logger_->log_debug("Created connection with UUID %s and name %s", id,
name);
-
- // Configure connection source
- if (connectionNode.as<YAML::Node>()["source relationship name"]) {
- auto rawRelationship = connectionNode["source relationship
name"].as<std::string>();
- core::Relationship relationship(rawRelationship, "");
- logger_->log_debug("parseConnection: relationship => [%s]",
rawRelationship);
- if (connection) {
- connection->addRelationship(relationship);
- }
- } else if (connectionNode.as<YAML::Node>()["source relationship
names"]) {
- auto relList = connectionNode["source relationship names"];
- if (connection) {
- if (relList.IsSequence()) {
- for (const auto &rel : relList) {
- auto rawRelationship = rel.as<std::string>();
- core::Relationship relationship(rawRelationship, "");
- logger_->log_debug("parseConnection: relationship => [%s]",
rawRelationship);
- connection->addRelationship(relationship);
- }
- } else {
- auto rawRelationship = relList.as<std::string>();
- core::Relationship relationship(rawRelationship, "");
- logger_->log_debug("parseConnection: relationship => [%s]",
rawRelationship);
- connection->addRelationship(relationship);
- }
- }
- }
-
- utils::Identifier srcUUID;
+ for (YAML::const_iterator iter = connectionsNode->begin(); iter !=
connectionsNode->end(); ++iter) {
+ YAML::Node connectionNode = iter->as<YAML::Node>();
+ std::shared_ptr<minifi::Connection> connection = nullptr;
- if (connectionNode["max work queue size"]) {
- auto max_work_queue_str = connectionNode["max work queue
size"].as<std::string>();
- uint64_t max_work_queue_size = 0;
- if (core::Property::StringToInt(max_work_queue_str,
max_work_queue_size)) {
- connection->setMaxQueueSize(max_work_queue_size);
- }
- logging::LOG_DEBUG(logger_) << "Setting " << max_work_queue_size <<
" as the max queue size for " << name;
- }
+ // Configure basic connection
+ std::string id = getOrGenerateId(&connectionNode);
- if (connectionNode["max work queue data size"]) {
- auto max_work_queue_str = connectionNode["max work queue data
size"].as<std::string>();
- uint64_t max_work_queue_data_size = 0;
- if (core::Property::StringToInt(max_work_queue_str,
max_work_queue_data_size)) {
- connection->setMaxQueueDataSize(max_work_queue_data_size);
- }
- logging::LOG_DEBUG(logger_) << "Setting " <<
max_work_queue_data_size << " as the max queue data size for " << name;
- }
+ // Default name to be same as ID
+ // If name is specified in configuration, use the value
+ std::string name = connectionNode["name"].as<std::string>(id);
- if (connectionNode["source id"]) {
- std::string connectionSrcProcId = connectionNode["source
id"].as<std::string>();
- srcUUID = connectionSrcProcId;
- logger_->log_debug("Using 'source id' to match source with same id
for "
- "connection '%s': source id => [%s]",
- name, connectionSrcProcId);
- } else {
- // if we don't have a source id, try to resolve using source name.
config schema v2 will make this unnecessary
- checkRequiredField(&connectionNode, "source name",
- CONFIG_YAML_CONNECTIONS_KEY);
- std::string connectionSrcProcName = connectionNode["source
name"].as<std::string>();
- utils::optional<utils::Identifier> tmpUUID =
utils::Identifier::parse(connectionSrcProcName);
- if (tmpUUID && NULL != parent->findProcessorById(tmpUUID.value())) {
- // the source name is a remote port id, so use that as the source
id
- srcUUID = tmpUUID.value();
- logger_->log_debug("Using 'source name' containing a remote port
id to match the source for "
- "connection '%s': source name => [%s]",
- name, connectionSrcProcName);
- } else {
- // lastly, look the processor up by name
- auto srcProcessor =
parent->findProcessorByName(connectionSrcProcName);
- if (NULL != srcProcessor) {
- srcUUID = srcProcessor->getUUID();
- logger_->log_debug("Using 'source name' to match source with
same name for "
- "connection '%s': source name => [%s]",
- name, connectionSrcProcName);
- } else {
- // we ran out of ways to discover the source processor
- logger_->log_error("Could not locate a source with name %s to
create a connection", connectionSrcProcName);
- throw std::invalid_argument("Could not locate a source with name
" + connectionSrcProcName + " to create a connection ");
- }
- }
- }
- connection->setSourceUUID(srcUUID);
-
- // Configure connection destination
- utils::Identifier destUUID;
- if (connectionNode["destination id"]) {
- std::string connectionDestProcId = connectionNode["destination
id"].as<std::string>();
- destUUID = connectionDestProcId;
- logger_->log_debug("Using 'destination id' to match destination with
same id for "
- "connection '%s': destination id => [%s]",
- name, connectionDestProcId);
- } else {
- // we use the same logic as above for resolving the source processor
- // for looking up the destination processor in absence of a
processor id
- checkRequiredField(&connectionNode, "destination name",
- CONFIG_YAML_CONNECTIONS_KEY);
- std::string connectionDestProcName = connectionNode["destination
name"].as<std::string>();
- utils::optional<utils::Identifier> tmpUUID =
utils::Identifier::parse(connectionDestProcName);
- if (tmpUUID && parent->findProcessorById(tmpUUID.value())) {
- // the destination name is a remote port id, so use that as the
dest id
- destUUID = tmpUUID.value();
- logger_->log_debug("Using 'destination name' containing a remote
port id to match the destination for "
- "connection '%s': destination name => [%s]",
- name, connectionDestProcName);
- } else {
- // look the processor up by name
- auto destProcessor =
parent->findProcessorByName(connectionDestProcName);
- if (NULL != destProcessor) {
- destUUID = destProcessor->getUUID();
- logger_->log_debug("Using 'destination name' to match
destination with same name for "
- "connection '%s': destination name => [%s]",
- name, connectionDestProcName);
- } else {
- // we ran out of ways to discover the destination processor
- logger_->log_error("Could not locate a destination with name %s
to create a connection", connectionDestProcName);
- throw std::invalid_argument("Could not locate a destination with
name " + connectionDestProcName + " to create a connection");
- }
- }
- }
- connection->setDestinationUUID(destUUID);
-
- if (connectionNode["flowfile expiration"]) {
- uint64_t expirationDuration = 0;
- std::string expiration = connectionNode["flowfile
expiration"].as<std::string>();
- TimeUnit unit;
- if (core::Property::StringToTime(expiration, expirationDuration,
unit) && core::Property::ConvertTimeUnitToMS(expirationDuration, unit,
expirationDuration)) {
- logger_->log_debug("parseConnection: flowfile expiration => [%d]",
expirationDuration);
- connection->setFlowExpirationDuration(expirationDuration);
- }
- }
+ const utils::optional<utils::Identifier> uuid =
utils::Identifier::parse(id);
+ if (!uuid) {
+ logger_->log_debug("Incorrect connection UUID format.");
+ throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect connection
UUID format.");
+ }
- if (connectionNode["drop empty"]) {
- std::string strvalue = connectionNode["drop
empty"].as<std::string>();
- bool dropEmpty = false;
- if (utils::StringUtils::StringToBool(strvalue, dropEmpty)) {
- connection->setDropEmptyFlowFiles(dropEmpty);
- }
- }
+ connection = createConnection(name, uuid.value());
+ logger_->log_debug("Created connection with UUID %s and name %s", id,
name);
+ const yaml::YamlConnectionParser connectionParser(connectionNode, name,
gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
+
connectionParser.configureConnectionSourceRelationshipsFromYaml(connection);
+ connection->setMaxQueueSize(connectionParser.getWorkQueueSizeFromYaml());
+
connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSizeFromYaml());
+ connection->setSourceUUID(connectionParser.getSourceUUIDFromYaml());
+
connection->setDestinationUUID(connectionParser.getDestinationUUIDFromYaml());
+
connection->setFlowExpirationDuration(connectionParser.getFlowFileExpirationFromYaml());
+ connection->setDropEmptyFlowFiles(connectionParser.getDropEmptyFromYaml());
- if (connection) {
- parent->addConnection(connection);
- }
- }
- }
+ parent->addConnection(connection);
}
}
@@ -717,10 +617,10 @@ void YamlConfiguration::parsePortYaml(YAML::Node
*portNode, core::ProcessGroup *
YAML::Node inputPortsObj = portNode->as<YAML::Node>();
// Check for required fields
- checkRequiredField(&inputPortsObj, "name",
+ yaml::checkRequiredField(&inputPortsObj, "name", logger_,
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
auto nameStr = inputPortsObj["name"].as<std::string>();
- checkRequiredField(&inputPortsObj, "id",
+ yaml::checkRequiredField(&inputPortsObj, "id", logger_,
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
"The field 'id' is required for "
"the port named '" + nameStr + "' in the YAML Config.
If this port "
@@ -989,26 +889,6 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node
*yamlNode, const std::
return id;
}
-void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, const
std::string &fieldName, const std::string &yamlSection, const std::string
&errorMessage) {
- std::string errMsg = errorMessage;
- if (!yamlNode->as<YAML::Node>()[fieldName]) {
- if (errMsg.empty()) {
- // Build a helpful error message for the user so they can fix the
- // invalid YAML config file, using the component name if present
- errMsg =
- yamlNode->as<YAML::Node>()["name"] ?
- "Unable to parse configuration file for component named '" +
yamlNode->as<YAML::Node>()["name"].as<std::string>() + "' as required field '"
+ fieldName + "' is missing" :
- "Unable to parse configuration file as required field '" +
fieldName + "' is missing";
- if (!yamlSection.empty()) {
- errMsg += " [in '" + yamlSection + "' section of configuration file]";
- }
- }
- logging::LOG_ERROR(logger_) << errMsg;
-
- throw std::invalid_argument(errMsg);
- }
-}
-
YAML::Node YamlConfiguration::getOptionalField(YAML::Node *yamlNode, const
std::string &fieldName, const YAML::Node &defaultValue, const std::string
&yamlSection,
const std::string
&providedInfoMessage) {
std::string infoMessage = providedInfoMessage;
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp
b/libminifi/src/core/yaml/YamlConnectionParser.cpp
new file mode 100644
index 0000000..cd26117
--- /dev/null
+++ b/libminifi/src/core/yaml/YamlConnectionParser.cpp
@@ -0,0 +1,182 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+// This is no longer needed in c++17
+constexpr const char* YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY;
+
+void
YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(const
std::shared_ptr<minifi::Connection>& connection) const {
+ auto addNewRelationshipToConnection = [&] (const std::string&
relationship_name) {
+ core::Relationship relationship(relationship_name, "");
+ logger_->log_debug("parseConnection: relationship => [%s]",
relationship_name);
+ connection->addRelationship(std::move(relationship));
+ };
+ // Configure connection source
+ if (connectionNode_.as<YAML::Node>()["source relationship name"]) {
+ addNewRelationshipToConnection(connectionNode_["source relationship
name"].as<std::string>());
+ } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+ auto relList = connectionNode_["source relationship names"];
+ if (relList.IsSequence()) {
+ for (const auto &rel : relList) {
+ addNewRelationshipToConnection(rel.as<std::string>());
+ }
+ } else {
+ addNewRelationshipToConnection(relList.as<std::string>());
+ }
+ }
+}
+
+uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
+ const YAML::Node max_work_queue_data_size_node = connectionNode_["max work
queue size"];
+ if (max_work_queue_data_size_node) {
+ auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+ uint64_t max_work_queue_size;
+ if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+ logger_->log_debug("Setting %" PRIu64 " as the max queue size.",
max_work_queue_size);
+ return max_work_queue_size;
+ }
+ logger_->log_info("Invalid max queue size value: %s.", max_work_queue_str);
+ }
+ return 0;
+}
+
+uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
+ const YAML::Node max_work_queue_data_size_node = connectionNode_["max work
queue data size"];
+ if (max_work_queue_data_size_node) {
+ auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+ uint64_t max_work_queue_data_size = 0;
+ if (core::Property::StringToInt(max_work_queue_str,
max_work_queue_data_size)) {
+ logger_->log_debug("Setting %" PRIu64 "as the max as the max queue data
size.", max_work_queue_data_size);
+ return max_work_queue_data_size;
+ }
+ logger_->log_info("Invalid max queue data size value: %s.",
max_work_queue_str);
+ }
+ return 0;
+}
+
+utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
+ const YAML::Node source_id_node = connectionNode_["source id"];
+ if (source_id_node) {
+ const utils::optional<utils::Identifier> srcUUID =
utils::Identifier::parse(source_id_node.as<std::string>());
+ if (srcUUID) {
+ logger_->log_debug("Using 'source id' to match source with same id for
connection '%s': source id => [%s]", name_, srcUUID.value().to_string());
+ return srcUUID.value();
+ }
+ logger_->log_error("Invalid source id value: %s.",
source_id_node.as<std::string>());
+ throw std::invalid_argument("Invalid source id");
+ }
+ // if we don't have a source id, try to resolve using source name. config
schema v2 will make this unnecessary
+ checkRequiredField(&connectionNode_, "source name", logger_,
CONFIG_YAML_CONNECTIONS_KEY);
+ const std::string connectionSrcProcName = connectionNode_["source
name"].as<std::string>();
+ const utils::optional<utils::Identifier> srcUUID =
utils::Identifier::parse(connectionSrcProcName);
+ if (srcUUID && parent_->findProcessorById(srcUUID.value())) {
+ // the source name is a remote port id, so use that as the source id
+ logger_->log_debug("Using 'source name' containing a remote port id to
match the source for connection '%s': source name => [%s]", name_,
connectionSrcProcName);
+ return srcUUID.value();
+ }
+ // lastly, look the processor up by name
+ auto srcProcessor = parent_->findProcessorByName(connectionSrcProcName);
+ if (nullptr != srcProcessor) {
+ logger_->log_debug("Using 'source name' to match source with same name for
connection '%s': source name => [%s]", name_, connectionSrcProcName);
+ return srcProcessor->getUUID();
+ }
+ // we ran out of ways to discover the source processor
+ const std::string error_msg = "Could not locate a source with name " +
connectionSrcProcName + " to create a connection ";
+ logger_->log_error(error_msg.c_str());
+ throw std::invalid_argument(error_msg);
+}
+
+utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
+ const YAML::Node destination_id_node = connectionNode_["destination id"];
+ if (destination_id_node) {
+ const utils::optional<utils::Identifier> destUUID =
utils::Identifier::parse(destination_id_node.as<std::string>());
+ if (destUUID) {
+ logger_->log_debug("Using 'destination id' to match destination with
same id for connection '%s': destination id => [%s]", name_,
destUUID.value().to_string());
+ return destUUID.value();
+ }
+ logger_->log_error("Invalid destination id value: %s.",
destination_id_node.as<std::string>());
+ throw std::invalid_argument("Invalid destination id");
+ }
+ // we use the same logic as above for resolving the source processor
+ // for looking up the destination processor in absence of a processor id
+ checkRequiredField(&connectionNode_, "destination name", logger_,
CONFIG_YAML_CONNECTIONS_KEY);
+ std::string connectionDestProcName = connectionNode_["destination
name"].as<std::string>();
+ const utils::optional<utils::Identifier> destUUID =
utils::Identifier::parse(connectionDestProcName);
+ if (destUUID && parent_->findProcessorById(destUUID.value())) {
+ // the destination name is a remote port id, so use that as the dest id
+ logger_->log_debug("Using 'destination name' containing a remote port id
to match the destination for connection '%s': destination name => [%s]", name_,
connectionDestProcName);
+ return destUUID.value();
+ }
+ // look the processor up by name
+ auto destProcessor = parent_->findProcessorByName(connectionDestProcName);
+ if (NULL != destProcessor) {
+ logger_->log_debug("Using 'destination name' to match destination with
same name for connection '%s': destination name => [%s]", name_,
connectionDestProcName);
+ return destProcessor->getUUID();
+ }
+ // we ran out of ways to discover the destination processor
+ const std::string error_msg = "Could not locate a destination with name " +
connectionDestProcName + " to create a connection";
+ logger_->log_error(error_msg.c_str());
+ throw std::invalid_argument(error_msg);
+}
+
+uint64_t YamlConnectionParser::getFlowFileExpirationFromYaml() const {
+ const YAML::Node expiration_node = connectionNode_["flowfile expiration"];
+ if (!expiration_node) {
+ logger_->log_debug("parseConnection: flowfile expiration is not set,
assuming 0 (never expire)");
+ return 0;
+ }
+ uint64_t expirationDuration = 0;
+ TimeUnit unit;
+ const std::string flowfile_expiration_str =
expiration_node.as<std::string>();
+ if (!core::Property::StringToTime(flowfile_expiration_str,
expirationDuration, unit) ||
!core::Property::ConvertTimeUnitToMS(expirationDuration, unit,
expirationDuration)) {
+ // We should throw here, but we do not.
+ // The reason is that our parser only accepts time formats that consists
of a number and
+ // a unit, but users might use this field populated with a "0" (and no
units).
+ // We cannot correct this, because there is no API contract for the
config, we need to support
+ // all already-supported configuration files.
+ // This has the side-effect of allowing values like "20 minuites" and
silently defaulting to 0.
+ logger_->log_debug("Parsing failure for flowfile expiration duration");
+ }
+ logger_->log_debug("parseConnection: flowfile expiration => [%d]",
expirationDuration);
+ return expirationDuration;
+}
+
+bool YamlConnectionParser::getDropEmptyFromYaml() const {
+ const YAML::Node drop_empty_node = connectionNode_["drop empty"];
+ if (drop_empty_node) {
+ bool dropEmpty = false;
+ return utils::StringUtils::StringToBool(drop_empty_node.as<std::string>(),
dropEmpty) && dropEmpty;
+ }
+ return false;
+}
+
+} // namespace yaml
+} // namespace core
+} // namespace minifi
+} // namespace nifi
+} // namespace apache
+} // namespace org
diff --git a/main/MainHelper.h b/main/MainHelper.h
index ea45833..247bea0 100644
--- a/main/MainHelper.h
+++ b/main/MainHelper.h
@@ -45,13 +45,6 @@ extern "C" {
//! Define home environment variable
#define MINIFI_HOME_ENV_KEY "MINIFI_HOME"
-/* Define Parser Values for Configuration YAML sections */
-#define CONFIG_YAML_PROCESSORS_KEY "Processors"
-#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
-#define CONFIG_YAML_CONNECTIONS_KEY "Connections"
-#define CONFIG_YAML_REMOTE_PROCESSING_GROUPS_KEY "Remote Processing Groups"
-
-
#ifdef _MSC_VER
#ifndef PATH_MAX
#define PATH_MAX 260