This is an automated email from the ASF dual-hosted git repository.

adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 960b6dc  MINIFICPP-1325 - Refactor and test YAML connection parsing
960b6dc is described below

commit 960b6dc0cf66c9cb28d97a90e4d9dd920a1af196
Author: Adam Hunyadi <[email protected]>
AuthorDate: Fri Feb 26 12:06:48 2021 +0100

    MINIFICPP-1325 - Refactor and test YAML connection parsing
    
    Signed-off-by: Adam Debreceni <[email protected]>
    
    This closes #948
---
 .../tests/unit/YamlConfigurationTests.cpp          | 203 +++++++--------
 .../tests/unit/YamlConnectionParserTest.cpp        | 198 +++++++++++++++
 libminifi/include/core/FlowFile.h                  |   2 +-
 libminifi/include/core/ProcessGroup.h              |   2 +-
 libminifi/include/core/yaml/CheckRequiredField.h   |  59 +++++
 libminifi/include/core/yaml/YamlConfiguration.h    |  57 +----
 libminifi/include/core/yaml/YamlConnectionParser.h |  66 +++++
 libminifi/include/utils/Id.h                       |   1 -
 libminifi/include/utils/TestUtils.h                |   8 +
 libminifi/src/core/ProcessGroup.cpp                |   8 +-
 libminifi/src/core/yaml/CheckRequiredField.cpp     |  59 +++++
 libminifi/src/core/yaml/YamlConfiguration.cpp      | 276 ++++++---------------
 libminifi/src/core/yaml/YamlConnectionParser.cpp   | 182 ++++++++++++++
 main/MainHelper.h                                  |   7 -
 14 files changed, 764 insertions(+), 364 deletions(-)

diff --git 
a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp 
b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
index c9ce42c..157fd03 100644
--- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp
@@ -19,9 +19,12 @@
 #include <map>
 #include <memory>
 #include "core/repository/VolatileContentRepository.h"
+#include "core/ProcessGroup.h"
 #include "core/RepositoryFactory.h"
 #include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
 #include "TestBase.h"
+#include "utils/TestUtils.h"
 
 TEST_CASE("Test YAML Config Processing", "[YamlConfiguration]") {
   TestController test_controller;
@@ -35,105 +38,105 @@ TEST_CASE("Test YAML Config Processing", 
"[YamlConfiguration]") {
 
   SECTION("loading YAML without optional component IDs works") {
   static const std::string CONFIG_YAML_WITHOUT_IDS = ""
-  "MiNiFi Config Version: 1\n"
-  "Flow Controller:\n"
-  "    name: MiNiFi Flow\n"
-  "    comment:\n"
-  "\n"
-  "Core Properties:\n"
-  "    flow controller graceful shutdown period: 10 sec\n"
-  "    flow service write delay interval: 500 ms\n"
-  "    administrative yield duration: 30 sec\n"
-  "    bored yield duration: 10 millis\n"
-  "\n"
-  "FlowFile Repository:\n"
-  "    partitions: 256\n"
-  "    checkpoint interval: 2 mins\n"
-  "    always sync: false\n"
-  "    Swap:\n"
-  "        threshold: 20000\n"
-  "        in period: 5 sec\n"
-  "        in threads: 1\n"
-  "        out period: 5 sec\n"
-  "        out threads: 4\n"
-  "\n"
-  "Provenance Repository:\n"
-  "    provenance rollover time: 1 min\n"
-  "\n"
-  "Content Repository:\n"
-  "    content claim max appendable size: 10 MB\n"
-  "    content claim max flow files: 100\n"
-  "    always sync: false\n"
-  "\n"
-  "Component Status Repository:\n"
-  "    buffer size: 1440\n"
-  "    snapshot frequency: 1 min\n"
-  "\n"
-  "Security Properties:\n"
-  "    keystore: /tmp/ssl/localhost-ks.jks\n"
-  "    keystore type: JKS\n"
-  "    keystore password: localtest\n"
-  "    key password: localtest\n"
-  "    truststore: /tmp/ssl/localhost-ts.jks\n"
-  "    truststore type: JKS\n"
-  "    truststore password: localtest\n"
-  "    ssl protocol: TLS\n"
-  "    Sensitive Props:\n"
-  "        key:\n"
-  "        algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL\n"
-  "        provider: BC\n"
-  "\n"
-  "Processors:\n"
-  "    - name: TailFile\n"
-  "      class: org.apache.nifi.processors.standard.TailFile\n"
-  "      max concurrent tasks: 1\n"
-  "      scheduling strategy: TIMER_DRIVEN\n"
-  "      scheduling period: 1 sec\n"
-  "      penalization period: 30 sec\n"
-  "      yield period: 1 sec\n"
-  "      run duration nanos: 0\n"
-  "      auto-terminated relationships list:\n"
-  "      Properties:\n"
-  "          File to Tail: logs/minifi-app.log\n"
-  "          Rolling Filename Pattern: minifi-app*\n"
-  "          Initial Start Position: Beginning of File\n"
-  "\n"
-  "Connections:\n"
-  "    - name: TailToS2S\n"
-  "      source name: TailFile\n"
-  "      source relationship name: success\n"
-  "      destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
-  "      max work queue size: 0\n"
-  "      max work queue data size: 1 MB\n"
-  "      flowfile expiration: 60 sec\n"
-  "      queue prioritizer class: 
org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer\n"
-  "\n"
-  "Remote Processing Groups:\n"
-  "    - name: NiFi Flow\n"
-  "      comment:\n"
-  "      url: https://localhost:8090/nifi\n";
-  "      timeout: 30 secs\n"
-  "      yield period: 10 sec\n"
-  "      Input Ports:\n"
-  "          - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
-  "            name: tailed log\n"
-  "            comments:\n"
-  "            max concurrent tasks: 1\n"
-  "            use compression: false\n"
-  "\n"
-  "Provenance Reporting:\n"
-  "    comment:\n"
-  "    scheduling strategy: TIMER_DRIVEN\n"
-  "    scheduling period: 30 sec\n"
-  "    host: localhost\n"
-  "    port name: provenance\n"
-  "    port: 8090\n"
-  "    port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n"
-  "    url: https://localhost:8090/\n";
-  "    originating url: http://${hostname(true)}:8081/nifi\n"
-  "    use compression: true\n"
-  "    timeout: 30 secs\n"
-  "    batch size: 1000";
+      "MiNiFi Config Version: 1\n"
+      "Flow Controller:\n"
+      "    name: MiNiFi Flow\n"
+      "    comment:\n"
+      "\n"
+      "Core Properties:\n"
+      "    flow controller graceful shutdown period: 10 sec\n"
+      "    flow service write delay interval: 500 ms\n"
+      "    administrative yield duration: 30 sec\n"
+      "    bored yield duration: 10 millis\n"
+      "\n"
+      "FlowFile Repository:\n"
+      "    partitions: 256\n"
+      "    checkpoint interval: 2 mins\n"
+      "    always sync: false\n"
+      "    Swap:\n"
+      "        threshold: 20000\n"
+      "        in period: 5 sec\n"
+      "        in threads: 1\n"
+      "        out period: 5 sec\n"
+      "        out threads: 4\n"
+      "\n"
+      "Provenance Repository:\n"
+      "    provenance rollover time: 1 min\n"
+      "\n"
+      "Content Repository:\n"
+      "    content claim max appendable size: 10 MB\n"
+      "    content claim max flow files: 100\n"
+      "    always sync: false\n"
+      "\n"
+      "Component Status Repository:\n"
+      "    buffer size: 1440\n"
+      "    snapshot frequency: 1 min\n"
+      "\n"
+      "Security Properties:\n"
+      "    keystore: /tmp/ssl/localhost-ks.jks\n"
+      "    keystore type: JKS\n"
+      "    keystore password: localtest\n"
+      "    key password: localtest\n"
+      "    truststore: /tmp/ssl/localhost-ts.jks\n"
+      "    truststore type: JKS\n"
+      "    truststore password: localtest\n"
+      "    ssl protocol: TLS\n"
+      "    Sensitive Props:\n"
+      "        key:\n"
+      "        algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL\n"
+      "        provider: BC\n"
+      "\n"
+      "Processors:\n"
+      "    - name: TailFile\n"
+      "      class: org.apache.nifi.processors.standard.TailFile\n"
+      "      max concurrent tasks: 1\n"
+      "      scheduling strategy: TIMER_DRIVEN\n"
+      "      scheduling period: 1 sec\n"
+      "      penalization period: 30 sec\n"
+      "      yield period: 1 sec\n"
+      "      run duration nanos: 0\n"
+      "      auto-terminated relationships list:\n"
+      "      Properties:\n"
+      "          File to Tail: logs/minifi-app.log\n"
+      "          Rolling Filename Pattern: minifi-app*\n"
+      "          Initial Start Position: Beginning of File\n"
+      "\n"
+      "Connections:\n"
+      "    - name: TailToS2S\n"
+      "      source name: TailFile\n"
+      "      source relationship name: success\n"
+      "      destination name: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
+      "      max work queue size: 0\n"
+      "      max work queue data size: 1 MB\n"
+      "      flowfile expiration: 60 sec\n"
+      "      queue prioritizer class: 
org.apache.nifi.prioritizer.NewestFlowFileFirstPrioritizer\n"
+      "\n"
+      "Remote Processing Groups:\n"
+      "    - name: NiFi Flow\n"
+      "      comment:\n"
+      "      url: https://localhost:8090/nifi\n";
+      "      timeout: 30 secs\n"
+      "      yield period: 10 sec\n"
+      "      Input Ports:\n"
+      "          - id: 8644cbcc-a45c-40e0-964d-5e536e2ada61\n"
+      "            name: tailed log\n"
+      "            comments:\n"
+      "            max concurrent tasks: 1\n"
+      "            use compression: false\n"
+      "\n"
+      "Provenance Reporting:\n"
+      "    comment:\n"
+      "    scheduling strategy: TIMER_DRIVEN\n"
+      "    scheduling period: 30 sec\n"
+      "    host: localhost\n"
+      "    port name: provenance\n"
+      "    port: 8090\n"
+      "    port uuid: 2f389b8d-83f2-48d3-b465-048f28a1cb56\n"
+      "    url: https://localhost:8090/\n";
+      "    originating url: http://${hostname(true)}:8081/nifi\n"
+      "    use compression: true\n"
+      "    timeout: 30 secs\n"
+      "    batch size: 1000";
 
   std::istringstream configYamlStream(CONFIG_YAML_WITHOUT_IDS);
   std::unique_ptr<core::ProcessGroup> rootFlowConfig = 
yamlConfig.getYamlRoot(configYamlStream);
@@ -494,7 +497,7 @@ Processors:
 
   REQUIRE(rootFlowConfig);
   REQUIRE(rootFlowConfig->findProcessorByName("PutFile"));
-  utils::Identifier uuid = 
rootFlowConfig->findProcessorByName("PutFile")->getUUID();
+  const utils::Identifier uuid = 
rootFlowConfig->findProcessorByName("PutFile")->getUUID();
   REQUIRE(uuid);
   
REQUIRE(!rootFlowConfig->findProcessorByName("PutFile")->getUUIDStr().empty());
 
diff --git 
a/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp 
b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
new file mode 100644
index 0000000..5925bb7
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/YamlConnectionParserTest.cpp
@@ -0,0 +1,198 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+
+#include "core/yaml/YamlConfiguration.h"
+#include "TailFile.h"
+#include "TestBase.h"
+#include "utils/TestUtils.h"
+
+namespace {
+
+using org::apache::nifi::minifi::core::yaml::YamlConnectionParser;
+using org::apache::nifi::minifi::core::YamlConfiguration;
+using RetryFlowFile = org::apache::nifi::minifi::processors::TailFile;
+
+TEST_CASE("Connections components are parsed from yaml", 
"[YamlConfiguration]") {
+  const std::shared_ptr<logging::Logger> logger = 
logging::LoggerFactory<YamlConfiguration>::getLogger();
+  core::ProcessGroup parent(core::ProcessGroupType::ROOT_PROCESS_GROUP, 
"root");
+  gsl::not_null<core::ProcessGroup*> parent_ptr{ &parent };
+
+  SECTION("Source relationships are read") {
+    const auto connection = std::make_shared<minifi::Connection>(nullptr, 
nullptr, "name");
+    std::string serialized_yaml;
+    std::set<org::apache::nifi::minifi::core::Relationship> expectations;
+    SECTION("Single relationship name") {
+      serialized_yaml = std::string { "source relationship name: success\n" };
+      expectations = { { "success", "" } };
+    }
+    SECTION("List of relationship names") {
+      serialized_yaml = std::string {
+          "source relationship names:\n"
+          "- success\n"
+          "- failure\n"
+          "- something_else\n" };
+      expectations = { { "success", "" }, { "failure", "" }, { 
"something_else", "" } };
+    }
+    YAML::Node connection_node = YAML::Load(serialized_yaml);
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", 
parent_ptr, logger);
+    
yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection);
+    const std::set<core::Relationship>& relationships = 
connection->getRelationships();
+    REQUIRE(expectations == relationships);
+  }
+  SECTION("Queue size limits are read") {
+    YAML::Node connection_node = YAML::Load(std::string {
+        "max work queue size: 231\n"
+        "max work queue data size: 12 MB\n" });
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", 
parent_ptr, logger);
+    REQUIRE(231 == yaml_connection_parser.getWorkQueueSizeFromYaml());
+    REQUIRE(12582912 == 
yaml_connection_parser.getWorkQueueDataSizeFromYaml());  // 12 * 1024 * 1024 B
+  }
+  SECTION("Source and destination names and uuids are read") {
+    const utils::Identifier expected_source_id = utils::generateUUID();
+    const utils::Identifier expected_destination_id = utils::generateUUID();
+    std::string serialized_yaml;
+    
parent.addProcessor(std::static_pointer_cast<core::Processor>(std::make_shared<processors::TailFile>("TailFile_1",
 expected_source_id)));
+    
parent.addProcessor(std::static_pointer_cast<core::Processor>(std::make_shared<processors::TailFile>("TailFile_2",
 expected_destination_id)));
+    SECTION("Directly from configuration") {
+      serialized_yaml = std::string {
+          "source id: " + expected_source_id.to_string() + "\n"
+          "destination id: " + expected_destination_id.to_string() + "\n" };
+    }
+    SECTION("Using UUID as remote processing group id") {
+      serialized_yaml = std::string {
+          "source name: " + expected_source_id.to_string() + "\n"
+          "destination name: " + expected_destination_id.to_string() + "\n" };
+    }
+    SECTION("Via processor name lookup") {
+      serialized_yaml = std::string {
+          "source name: TailFile_1\n"
+          "destination name: TailFile_2\n" };
+    }
+    YAML::Node connection_node = YAML::Load(serialized_yaml);
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", 
parent_ptr, logger);
+    REQUIRE(expected_source_id == 
yaml_connection_parser.getSourceUUIDFromYaml());
+    REQUIRE(expected_destination_id == 
yaml_connection_parser.getDestinationUUIDFromYaml());
+  }
+  SECTION("Flow file expiration is read") {
+    YAML::Node connection_node = YAML::Load(std::string {
+        "flowfile expiration: 2 min\n" });
+    YamlConnectionParser yaml_connection_parser(connection_node, "test_node", 
parent_ptr, logger);
+    REQUIRE(120000 == yaml_connection_parser.getFlowFileExpirationFromYaml()); 
 // 2 * 60 * 1000 ms
+  }
+  SECTION("Drop empty value is read") {
+    SECTION("When config contains true value") {
+      YAML::Node connection_node = YAML::Load(std::string {
+          "drop empty: true\n" });
+      YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
+      REQUIRE(true == yaml_connection_parser.getDropEmptyFromYaml());
+    }
+    SECTION("When config contains false value") {
+      YAML::Node connection_node = YAML::Load(std::string {
+          "drop empty: false\n" });
+      YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
+      REQUIRE(false == yaml_connection_parser.getDropEmptyFromYaml());
+    }
+  }
+  SECTION("Errors are handled properly when configuration lines are missing") {
+    const auto connection = std::make_shared<minifi::Connection>(nullptr, 
nullptr, "name");
+    SECTION("With empty configuration") {
+      YAML::Node connection_node = YAML::Load(std::string(""));
+      YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
+      
CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());
+    }
+    SECTION("With a configuration that lists keys but has no assigned values") 
{
+      std::string serialized_yaml;
+      SECTION("Single relationship name left empty") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source name: \n"
+            "destination name: \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
+        // This seems incorrect, but we do not want to ruin backward 
compatibility
+        
CHECK_NOTHROW(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+      }
+      SECTION("List of relationship names contains empty item") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source relationship names:\n"
+            "- \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
+        
CHECK_THROWS(yaml_connection_parser.configureConnectionSourceRelationshipsFromYaml(connection));
+      }
+      SECTION("Source and destination lookup from via id") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source id: \n"
+            "destination id: \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+        CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+      }
+      SECTION("Source and destination lookup via name") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "source name: \n"
+            "destination name: \n" });
+        YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.getSourceUUIDFromYaml());
+        CHECK_THROWS(yaml_connection_parser.getDestinationUUIDFromYaml());
+      }
+      SECTION("Queue limits and configuration") {
+        YAML::Node connection_node = YAML::Load(std::string {
+            "max work queue size: \n"
+            "max work queue data size: \n"
+            "flowfile expiration: \n"
+            "drop empty: \n"});
+        YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
+        CHECK_THROWS(yaml_connection_parser.getWorkQueueSizeFromYaml());
+        CHECK_THROWS(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+        CHECK_THROWS(yaml_connection_parser.getFlowFileExpirationFromYaml());
+        CHECK_THROWS(yaml_connection_parser.getDropEmptyFromYaml());
+      }
+    }
+    SECTION("With a configuration that has values of incorrect format") {
+      YAML::Node connection_node = YAML::Load(std::string {
+          "max work queue size: 2 KB\n"
+          "max work queue data size: 10 Incorrect\n"
+          "flowfile expiration: 12\n"
+          "drop empty: sup\n"});
+      YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
+      // This seems incorrect, but we do not want to ruin backward 
compatibility
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueSizeFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getFlowFileExpirationFromYaml());
+      CHECK_NOTHROW(yaml_connection_parser.getDropEmptyFromYaml());
+    }
+    SECTION("Known incorrect formats that behave strangely") {
+      YAML::Node connection_node = YAML::Load(std::string {
+          "max work queue data size: 2 Baby Pandas (img, 20 MB) that are cared 
for by a group of 30 giraffes\n"
+          "flowfile expiration: 0\n"
+          "drop empty: NULL\n"});
+      YamlConnectionParser yaml_connection_parser(connection_node, 
"test_node", parent_ptr, logger);
+      REQUIRE(2 == yaml_connection_parser.getWorkQueueDataSizeFromYaml());
+      REQUIRE(0 == yaml_connection_parser.getFlowFileExpirationFromYaml());
+      CHECK_THROWS(yaml_connection_parser.getDropEmptyFromYaml());
+    }
+  }
+}
+
+}  // namespace
diff --git a/libminifi/include/core/FlowFile.h 
b/libminifi/include/core/FlowFile.h
index d2924bc..ca643ad 100644
--- a/libminifi/include/core/FlowFile.h
+++ b/libminifi/include/core/FlowFile.h
@@ -186,7 +186,7 @@ class FlowFile : public CoreComponent, public 
ReferenceContainer {
 
   /**
    * Set the size of this record.
-   * @param size size of record to set.Ï
+   * @param size size of record to set.
    */
   void setSize(const uint64_t size) {
     size_ = size;
diff --git a/libminifi/include/core/ProcessGroup.h 
b/libminifi/include/core/ProcessGroup.h
index bb98c37..fad30a7 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -206,7 +206,7 @@ class ProcessGroup : public CoreComponent {
    * @param nodeId node identifier
    * @param node controller service node.
    */
-  void addControllerService(const std::string &nodeId, 
std::shared_ptr<core::controller::ControllerServiceNode> &node);
+  void addControllerService(const std::string &nodeId, const 
std::shared_ptr<core::controller::ControllerServiceNode> &node);
 
   /**
    * Find controllerservice node will search child groups until the nodeId is 
found.
diff --git a/libminifi/include/core/yaml/CheckRequiredField.h 
b/libminifi/include/core/yaml/CheckRequiredField.h
new file mode 100644
index 0000000..7029166
--- /dev/null
+++ b/libminifi/include/core/yaml/CheckRequiredField.h
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "core/logging/LoggerConfiguration.h"
+#include "yaml-cpp/yaml.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+/**
+ * This is a helper function for verifying the existence of a required
+ * field in a YAML::Node object. If the field is not present, an error
+ * message will be logged and an std::invalid_argument exception will be
+ * thrown indicating the absence of the required field in the YAML node.
+ *
+ * @param yamlNode     the YAML node to check
+ * @param fieldName    the required field key
+ * @param yamlSection  [optional] the top level section of the YAML config
+ *                       for the yamlNode. This is used for generating a
+ *                       useful error message for troubleshooting.
+ * @param errorMessage [optional] the error message string to use if
+ *                       the required field is missing. If not provided,
+ *                       a default error message will be generated.
+ *
+ * @throws std::invalid_argument if the required field 'fieldName' is
+ *                               not present in 'yamlNode'
+ */
+void checkRequiredField(
+    const YAML::Node *yamlNode, const std::string &fieldName, const 
std::shared_ptr<logging::Logger>& logger, const std::string &yamlSection = "", 
const std::string &errorMessage = "");
+
+}  // namespace yaml
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h 
b/libminifi/include/core/yaml/YamlConfiguration.h
index 83aa204..8342ccd 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -34,6 +34,8 @@
 #include "utils/OptionalUtils.h"
 #include "yaml-cpp/yaml.h"
 
+class YamlConfigurationTestAccessor;
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -42,7 +44,6 @@ namespace core {
 
 #define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
 #define CONFIG_YAML_PROCESSORS_KEY "Processors"
-#define CONFIG_YAML_CONNECTIONS_KEY "Connections"
 #define CONFIG_YAML_CONTROLLER_SERVICES_KEY "Controller Services"
 #define CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY "Remote Processing Groups"
 #define CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3 "Remote Process Groups"
@@ -155,36 +156,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @return             the root ProcessGroup node of the flow
    *                       configuration tree
    */
-  std::unique_ptr<core::ProcessGroup> getYamlRoot(YAML::Node *rootYamlNode) {
-    YAML::Node rootYaml = *rootYamlNode;
-    YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY];
-    YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY];
-    YAML::Node connectionsNode = rootYaml[CONFIG_YAML_CONNECTIONS_KEY];
-    YAML::Node controllerServiceNode = 
rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
-    YAML::Node remoteProcessingGroupsNode = 
rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
-
-    if (!remoteProcessingGroupsNode) {
-      remoteProcessingGroupsNode = 
rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
-    }
-
-    YAML::Node provenanceReportNode = 
rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
-
-    parseControllerServices(&controllerServiceNode);
-    // Create the root process group
-    core::ProcessGroup *root = parseRootProcessGroupYaml(flowControllerNode);
-    parseProcessorNodeYaml(processorsNode, root);
-    parseRemoteProcessGroupYaml(&remoteProcessingGroupsNode, root);
-    parseConnectionYaml(&connectionsNode, root);
-    parseProvenanceReportingYaml(&provenanceReportNode, root);
-
-    // set the controller services into the root group.
-    for (auto controller_service : 
controller_services_->getAllControllerServices()) {
-      root->addControllerService(controller_service->getName(), 
controller_service);
-      root->addControllerService(controller_service->getUUIDStr(), 
controller_service);
-    }
-
-    return std::unique_ptr<core::ProcessGroup>(root);
-  }
+  std::unique_ptr<core::ProcessGroup> getYamlRoot(YAML::Node *rootYamlNode);
 
   /**
    * Parses a processor from its corresponding YAML config node and adds
@@ -220,7 +192,7 @@ class YamlConfiguration : public FlowConfiguration {
    * @param rootNode
    * @return
    */
-  core::ProcessGroup *parseRootProcessGroupYaml(YAML::Node rootNode);
+  std::unique_ptr<core::ProcessGroup> parseRootProcessGroupYaml(YAML::Node 
rootNode);
 
   // Process Property YAML
   void parseProcessorPropertyYaml(YAML::Node *doc, YAML::Node *node, 
std::shared_ptr<core::Processor> processor);
@@ -230,7 +202,6 @@ class YamlConfiguration : public FlowConfiguration {
    * @param parent parent process group.
    */
   void parseControllerServices(YAML::Node *controllerServicesNode);
-  // Process connection YAML
 
   /**
    * Parses the Connections section of a configuration YAML.
@@ -293,26 +264,6 @@ class YamlConfiguration : public FlowConfiguration {
   std::string getOrGenerateId(YAML::Node *yamlNode, const std::string &idField 
= "id");
 
   /**
-   * This is a helper function for verifying the existence of a required
-   * field in a YAML::Node object. If the field is not present, an error
-   * message will be logged and a std::invalid_argument exception will be
-   * thrown indicating the absence of the required field in the YAML node.
-   *
-   * @param yamlNode     the YAML node to check
-   * @param fieldName    the required field key
-   * @param yamlSection  [optional] the top level section of the YAML config
-   *                       for the yamlNode. This is used fpr generating a
-   *                       useful error message for troubleshooting.
-   * @param errorMessage [optional] the error message string to use if
-   *                       the required field is missing. If not provided,
-   *                       a default error message will be generated.
-   *
-   * @throws std::invalid_argument if the required field 'fieldName' is
-   *                               not present in 'yamlNode'
-   */
-  void checkRequiredField(YAML::Node *yamlNode, const std::string &fieldName, 
const std::string &yamlSection = "", const std::string &errorMessage = "");
-
-  /**
    * This is a helper function for getting an optional value, if it exists.
    * If it does not exist, returns the provided default value.
    *
diff --git a/libminifi/include/core/yaml/YamlConnectionParser.h 
b/libminifi/include/core/yaml/YamlConnectionParser.h
new file mode 100644
index 0000000..512b928
--- /dev/null
+++ b/libminifi/include/core/yaml/YamlConnectionParser.h
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "core/ProcessGroup.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "yaml-cpp/yaml.h"
+#include "utils/gsl.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+class YamlConnectionParser {
+ public:
+  static constexpr const char* CONFIG_YAML_CONNECTIONS_KEY{ "Connections" };
+
+  explicit YamlConnectionParser(const YAML::Node& connectionNode, const 
std::string& name, gsl::not_null<core::ProcessGroup*> parent, const 
std::shared_ptr<logging::Logger>& logger) :
+      connectionNode_(connectionNode),
+      name_(name),
+      parent_(parent),
+      logger_(logger) {}
+
+  void configureConnectionSourceRelationshipsFromYaml(const 
std::shared_ptr<minifi::Connection>& connection) const;
+  uint64_t getWorkQueueSizeFromYaml() const;
+  uint64_t getWorkQueueDataSizeFromYaml() const;
+  utils::Identifier getSourceUUIDFromYaml() const;
+  utils::Identifier getDestinationUUIDFromYaml() const;
+  uint64_t getFlowFileExpirationFromYaml() const;
+  bool getDropEmptyFromYaml() const;
+ private:
+  const YAML::Node& connectionNode_;
+  const std::string& name_;
+  gsl::not_null<core::ProcessGroup*> parent_;
+  const std::shared_ptr<logging::Logger> logger_;
+};
+
+}  // namespace yaml
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h
index e724ace..687d19f 100644
--- a/libminifi/include/utils/Id.h
+++ b/libminifi/include/utils/Id.h
@@ -54,7 +54,6 @@ namespace utils {
 
 class Identifier {
   friend struct IdentifierTestAccessor;
-  static constexpr const char* UUID_FORMAT_STRING = 
"%02hhx%02hhx%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx-%02hhx%02hhx%02hhx%02hhx%02hhx%02hhx";
   static constexpr const char* hex_lut = "0123456789abcdef";
 
  public:
diff --git a/libminifi/include/utils/TestUtils.h 
b/libminifi/include/utils/TestUtils.h
index f62cef8..88c8d43 100644
--- a/libminifi/include/utils/TestUtils.h
+++ b/libminifi/include/utils/TestUtils.h
@@ -19,10 +19,12 @@
 #pragma once
 
 #include <string>
+#include <memory>
 
 #include "../../test/TestBase.h"
 #include "utils/file/FileUtils.h"
 #include "utils/Environment.h"
+#include "utils/Id.h"
 #include "utils/TimeUtil.h"
 
 namespace org {
@@ -66,6 +68,12 @@ std::string getFileContent(const std::string& file_name) {
   return file_content;
 }
 
+Identifier generateUUID() {
+  // TODO(hunyadi): Will make the Id generator manage lifetime using a 
unique_ptr and return a raw ptr on access
+  static std::shared_ptr<utils::IdGenerator> id_generator = 
utils::IdGenerator::getIdGenerator();
+  return id_generator->generate();
+}
+
 class ManualClock : public timeutils::Clock {
  public:
   std::chrono::milliseconds timeSinceEpoch() const override { return time_; }
diff --git a/libminifi/src/core/ProcessGroup.cpp 
b/libminifi/src/core/ProcessGroup.cpp
index 892cdb1..ba28246 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -272,7 +272,7 @@ std::shared_ptr<Processor> 
ProcessGroup::findProcessorByName(const std::string &
   return findProcessor(name_matches);
 }
 
-void ProcessGroup::addControllerService(const std::string &nodeId, 
std::shared_ptr<core::controller::ControllerServiceNode> &node) {
+void ProcessGroup::addControllerService(const std::string &nodeId, const 
std::shared_ptr<core::controller::ControllerServiceNode> &node) {
   controller_service_map_.put(nodeId, node);
 }
 
@@ -352,11 +352,13 @@ void ProcessGroup::addConnection(const 
std::shared_ptr<Connection>& connection)
     connections_.insert(connection);
     logger_->log_debug("Add connection %s into process group %s", 
connection->getName(), name_);
     std::shared_ptr<Processor> source = 
this->findProcessorById(connection->getSourceUUID());
-    if (source)
+    if (source) {
       source->addConnection(connection);
+    }
     std::shared_ptr<Processor> destination = 
this->findProcessorById(connection->getDestinationUUID());
-    if (destination && destination != source)
+    if (destination && destination != source) {
       destination->addConnection(connection);
+    }
   }
 }
 
diff --git a/libminifi/src/core/yaml/CheckRequiredField.cpp 
b/libminifi/src/core/yaml/CheckRequiredField.cpp
new file mode 100644
index 0000000..9ea47d7
--- /dev/null
+++ b/libminifi/src/core/yaml/CheckRequiredField.cpp
@@ -0,0 +1,59 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdexcept>
+
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+void checkRequiredField(const YAML::Node *yamlNode, const std::string 
&fieldName, const std::shared_ptr<logging::Logger>& logger, const std::string 
&yamlSection, const std::string &errorMessage) {
+  std::string errMsg = errorMessage;
+  if (!yamlNode->as<YAML::Node>()[fieldName]) {
+    if (errMsg.empty()) {
+      const YAML::Node name_node = yamlNode->as<YAML::Node>()["name"];
+      // Build a helpful error message for the user so they can fix the
+      // invalid YAML config file, using the component name if present
+      errMsg =
+          name_node ?
+              "Unable to parse configuration file for component named '" + 
name_node.as<std::string>() + "' as required field '" + fieldName + "' is 
missing" :
+              "Unable to parse configuration file as required field '" + 
fieldName + "' is missing";
+      if (!yamlSection.empty()) {
+        errMsg += " [in '" + yamlSection + "' section of configuration file]";
+      }
+      const YAML::Mark mark = yamlNode->Mark();
+      if (!mark.is_null()) {
+        errMsg += " [line:column, pos at " + std::to_string(mark.line) + ":" + 
std::to_string(mark.column) + ", " + std::to_string(mark.pos) + "]";
+      }
+    }
+    logger->log_error(errMsg.c_str());
+    throw std::invalid_argument(errMsg);
+  }
+}
+
+}  // namespace yaml
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp 
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index dbb2b4f..111dd8c 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -22,6 +22,8 @@
 #include <cinttypes>
 
 #include "core/yaml/YamlConfiguration.h"
+#include "core/yaml/CheckRequiredField.h"
+#include "core/yaml/YamlConnectionParser.h"
 #include "core/state/Value.h"
 #include "Defaults.h"
 
@@ -46,11 +48,11 @@ YamlConfiguration::YamlConfiguration(const 
std::shared_ptr<core::Repository>& re
       stream_factory_(stream_factory),
       logger_(logging::LoggerFactory<YamlConfiguration>::getLogger()) {}
 
-core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node 
rootFlowNode) {
+std::unique_ptr<core::ProcessGroup> 
YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) {
   utils::Identifier uuid;
   int version = 0;
 
-  checkRequiredField(&rootFlowNode, "name",
+  yaml::checkRequiredField(&rootFlowNode, "name", logger_,
   CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   std::string flowName = rootFlowNode["name"].as<std::string>();
 
@@ -88,9 +90,40 @@ core::ProcessGroup 
*YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
     }
   }
 
-  return group.release();
+  return group;
 }
 
+std::unique_ptr<core::ProcessGroup> YamlConfiguration::getYamlRoot(YAML::Node 
*rootYamlNode) {
+    YAML::Node rootYaml = *rootYamlNode;
+    YAML::Node flowControllerNode = rootYaml[CONFIG_YAML_FLOW_CONTROLLER_KEY];
+    YAML::Node processorsNode = rootYaml[CONFIG_YAML_PROCESSORS_KEY];
+    YAML::Node connectionsNode = 
rootYaml[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
+    YAML::Node controllerServiceNode = 
rootYaml[CONFIG_YAML_CONTROLLER_SERVICES_KEY];
+    YAML::Node remoteProcessingGroupsNode = 
rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
+
+    if (!remoteProcessingGroupsNode) {
+      remoteProcessingGroupsNode = 
rootYaml[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3];
+    }
+
+    YAML::Node provenanceReportNode = 
rootYaml[CONFIG_YAML_PROVENANCE_REPORT_KEY];
+
+    parseControllerServices(&controllerServiceNode);
+    // Create the root process group
+    std::unique_ptr<core::ProcessGroup> root = 
parseRootProcessGroupYaml(flowControllerNode);
+    parseProcessorNodeYaml(processorsNode, root.get());
+    parseRemoteProcessGroupYaml(&remoteProcessingGroupsNode, root.get());
+    parseConnectionYaml(&connectionsNode, root.get());
+    parseProvenanceReportingYaml(&provenanceReportNode, root.get());
+
+    // set the controller services into the root group.
+    for (const auto& controller_service : 
controller_services_->getAllControllerServices()) {
+      root->addControllerService(controller_service->getName(), 
controller_service);
+      root->addControllerService(controller_service->getUUIDStr(), 
controller_service);
+    }
+
+    return root;
+  }
+
 void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, 
core::ProcessGroup *parentGroup) {
   int64_t schedulingPeriod = -1;
   int64_t penalizationPeriod = -1;
@@ -111,7 +144,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node 
processorsNode, core::
         core::ProcessorConfig procCfg;
         YAML::Node procNode = iter->as<YAML::Node>();
 
-        checkRequiredField(&procNode, "name",
+        yaml::checkRequiredField(&procNode, "name", logger_,
         CONFIG_YAML_PROCESSORS_KEY);
         procCfg.name = procNode["name"].as<std::string>();
         procCfg.id = getOrGenerateId(&procNode);
@@ -126,7 +159,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node 
processorsNode, core::
 
         uuid = procCfg.id.c_str();
         logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", 
procCfg.name, procCfg.id);
-        checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
+        yaml::checkRequiredField(&procNode, "class", logger_, 
CONFIG_YAML_PROCESSORS_KEY);
         procCfg.javaClass = procNode["class"].as<std::string>();
         logger_->log_debug("parseProcessorNode: class => [%s]", 
procCfg.javaClass);
 
@@ -281,7 +314,7 @@ void 
YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
       for (YAML::const_iterator iter = rpgNode->begin(); iter != 
rpgNode->end(); ++iter) {
         YAML::Node currRpgNode = iter->as<YAML::Node>();
 
-        checkRequiredField(&currRpgNode, "name",
+        yaml::checkRequiredField(&currRpgNode, "name", logger_,
         CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
         auto name = currRpgNode["name"].as<std::string>();
         id = getOrGenerateId(&currRpgNode);
@@ -369,7 +402,7 @@ void 
YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
         group->setTransmitting(true);
         group->setURL(url);
 
-        checkRequiredField(&currRpgNode, "Input Ports",
+        yaml::checkRequiredField(&currRpgNode, "Input Ports", logger_,
         CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
         YAML::Node inputPorts = currRpgNode["Input Ports"].as<YAML::Node>();
         if (inputPorts && inputPorts.IsSequence()) {
@@ -414,10 +447,10 @@ void 
YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
 
   YAML::Node node = reportNode->as<YAML::Node>();
 
-  checkRequiredField(&node, "scheduling strategy",
+  yaml::checkRequiredField(&node, "scheduling strategy", logger_,
   CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto schedulingStrategyStr = node["scheduling strategy"].as<std::string>();
-  checkRequiredField(&node, "scheduling period",
+  yaml::checkRequiredField(&node, "scheduling period", logger_,
   CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto schedulingPeriodStr = node["scheduling period"].as<std::string>();
 
@@ -453,9 +486,9 @@ void 
YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
       logger_->log_debug("ProvenanceReportingTask URL %s", urlStr);
     }
   }
-  checkRequiredField(&node, "port uuid", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+  yaml::checkRequiredField(&node, "port uuid", logger_, 
CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto portUUIDStr = node["port uuid"].as<std::string>();
-  checkRequiredField(&node, "batch size", CONFIG_YAML_PROVENANCE_REPORT_KEY);
+  yaml::checkRequiredField(&node, "batch size", logger_, 
CONFIG_YAML_PROVENANCE_REPORT_KEY);
   auto batchSizeStr = node["batch size"].as<std::string>();
 
   logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
@@ -479,17 +512,17 @@ void 
YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNo
       for (auto iter : *controllerServicesNode) {
         YAML::Node controllerServiceNode = iter.as<YAML::Node>();
         try {
-          checkRequiredField(&controllerServiceNode, "name",
+          yaml::checkRequiredField(&controllerServiceNode, "name", logger_,
           CONFIG_YAML_CONTROLLER_SERVICES_KEY);
-          checkRequiredField(&controllerServiceNode, "id",
+          yaml::checkRequiredField(&controllerServiceNode, "id", logger_,
           CONFIG_YAML_CONTROLLER_SERVICES_KEY);
           std::string type = "";
 
           try {
-            checkRequiredField(&controllerServiceNode, "class", 
CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+            yaml::checkRequiredField(&controllerServiceNode, "class", logger_, 
CONFIG_YAML_CONTROLLER_SERVICES_KEY);
             type = controllerServiceNode["class"].as<std::string>();
           } catch (const std::invalid_argument &) {
-            checkRequiredField(&controllerServiceNode, "type", 
CONFIG_YAML_CONTROLLER_SERVICES_KEY);
+            yaml::checkRequiredField(&controllerServiceNode, "type", logger_, 
CONFIG_YAML_CONTROLLER_SERVICES_KEY);
             type = controllerServiceNode["type"].as<std::string>();
             logger_->log_debug("Using type %s for controller service node", 
type);
           }
@@ -530,177 +563,44 @@ void 
YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNo
   }
 }
 
-void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, 
core::ProcessGroup *parent) {
+void YamlConfiguration::parseConnectionYaml(YAML::Node* connectionsNode, 
core::ProcessGroup* parent) {
   if (!parent) {
     logger_->log_error("parseProcessNode: no parent group was provided");
     return;
   }
+  if (!connectionsNode || !connectionsNode->IsSequence()) {
+    return;
+  }
 
-  if (connectionsNode) {
-    if (connectionsNode->IsSequence()) {
-      for (YAML::const_iterator iter = connectionsNode->begin(); iter != 
connectionsNode->end(); ++iter) {
-        YAML::Node connectionNode = iter->as<YAML::Node>();
-        std::shared_ptr<minifi::Connection> connection = nullptr;
-
-        // Configure basic connection
-        utils::Identifier uuid;
-        std::string id = getOrGenerateId(&connectionNode);
-
-        // Default name to be same as ID
-        std::string name = id;
-
-        // If name is specified in configuration, use the value
-        if (connectionNode["name"]) {
-          name = connectionNode["name"].as<std::string>();
-        }
-
-        uuid = id;
-        connection = this->createConnection(name, uuid);
-        logger_->log_debug("Created connection with UUID %s and name %s", id, 
name);
-
-        // Configure connection source
-        if (connectionNode.as<YAML::Node>()["source relationship name"]) {
-          auto rawRelationship = connectionNode["source relationship 
name"].as<std::string>();
-          core::Relationship relationship(rawRelationship, "");
-          logger_->log_debug("parseConnection: relationship => [%s]", 
rawRelationship);
-          if (connection) {
-            connection->addRelationship(relationship);
-          }
-        } else if (connectionNode.as<YAML::Node>()["source relationship 
names"]) {
-          auto relList = connectionNode["source relationship names"];
-          if (connection) {
-            if (relList.IsSequence()) {
-              for (const auto &rel : relList) {
-                auto rawRelationship = rel.as<std::string>();
-                core::Relationship relationship(rawRelationship, "");
-                logger_->log_debug("parseConnection: relationship => [%s]", 
rawRelationship);
-                connection->addRelationship(relationship);
-              }
-            } else {
-              auto rawRelationship = relList.as<std::string>();
-              core::Relationship relationship(rawRelationship, "");
-              logger_->log_debug("parseConnection: relationship => [%s]", 
rawRelationship);
-              connection->addRelationship(relationship);
-            }
-          }
-        }
-
-        utils::Identifier srcUUID;
+  for (YAML::const_iterator iter = connectionsNode->begin(); iter != 
connectionsNode->end(); ++iter) {
+    YAML::Node connectionNode = iter->as<YAML::Node>();
+    std::shared_ptr<minifi::Connection> connection = nullptr;
 
-        if (connectionNode["max work queue size"]) {
-          auto max_work_queue_str = connectionNode["max work queue 
size"].as<std::string>();
-          uint64_t max_work_queue_size = 0;
-          if (core::Property::StringToInt(max_work_queue_str, 
max_work_queue_size)) {
-            connection->setMaxQueueSize(max_work_queue_size);
-          }
-          logging::LOG_DEBUG(logger_) << "Setting " << max_work_queue_size << 
" as the max queue size for " << name;
-        }
+    // Configure basic connection
+    std::string id = getOrGenerateId(&connectionNode);
 
-        if (connectionNode["max work queue data size"]) {
-          auto max_work_queue_str = connectionNode["max work queue data 
size"].as<std::string>();
-          uint64_t max_work_queue_data_size = 0;
-          if (core::Property::StringToInt(max_work_queue_str, 
max_work_queue_data_size)) {
-            connection->setMaxQueueDataSize(max_work_queue_data_size);
-          }
-          logging::LOG_DEBUG(logger_) << "Setting " << 
max_work_queue_data_size << " as the max queue data size for " << name;
-        }
+    // Default name to be same as ID
+    // If name is specified in configuration, use the value
+    std::string name = connectionNode["name"].as<std::string>(id);
 
-        if (connectionNode["source id"]) {
-          std::string connectionSrcProcId = connectionNode["source 
id"].as<std::string>();
-          srcUUID = connectionSrcProcId;
-          logger_->log_debug("Using 'source id' to match source with same id 
for "
-                             "connection '%s': source id => [%s]",
-                             name, connectionSrcProcId);
-        } else {
-          // if we don't have a source id, try to resolve using source name. 
config schema v2 will make this unnecessary
-          checkRequiredField(&connectionNode, "source name",
-          CONFIG_YAML_CONNECTIONS_KEY);
-          std::string connectionSrcProcName = connectionNode["source 
name"].as<std::string>();
-          utils::optional<utils::Identifier> tmpUUID = 
utils::Identifier::parse(connectionSrcProcName);
-          if (tmpUUID && NULL != parent->findProcessorById(tmpUUID.value())) {
-            // the source name is a remote port id, so use that as the source 
id
-            srcUUID = tmpUUID.value();
-            logger_->log_debug("Using 'source name' containing a remote port 
id to match the source for "
-                               "connection '%s': source name => [%s]",
-                               name, connectionSrcProcName);
-          } else {
-            // lastly, look the processor up by name
-            auto srcProcessor = 
parent->findProcessorByName(connectionSrcProcName);
-            if (NULL != srcProcessor) {
-              srcUUID = srcProcessor->getUUID();
-              logger_->log_debug("Using 'source name' to match source with 
same name for "
-                                 "connection '%s': source name => [%s]",
-                                 name, connectionSrcProcName);
-            } else {
-              // we ran out of ways to discover the source processor
-              logger_->log_error("Could not locate a source with name %s to 
create a connection", connectionSrcProcName);
-              throw std::invalid_argument("Could not locate a source with name 
" + connectionSrcProcName + " to create a connection ");
-            }
-          }
-        }
-        connection->setSourceUUID(srcUUID);
-
-        // Configure connection destination
-        utils::Identifier destUUID;
-        if (connectionNode["destination id"]) {
-          std::string connectionDestProcId = connectionNode["destination 
id"].as<std::string>();
-          destUUID = connectionDestProcId;
-          logger_->log_debug("Using 'destination id' to match destination with 
same id for "
-                             "connection '%s': destination id => [%s]",
-                             name, connectionDestProcId);
-        } else {
-          // we use the same logic as above for resolving the source processor
-          // for looking up the destination processor in absence of a 
processor id
-          checkRequiredField(&connectionNode, "destination name",
-          CONFIG_YAML_CONNECTIONS_KEY);
-          std::string connectionDestProcName = connectionNode["destination 
name"].as<std::string>();
-          utils::optional<utils::Identifier> tmpUUID = 
utils::Identifier::parse(connectionDestProcName);
-          if (tmpUUID && parent->findProcessorById(tmpUUID.value())) {
-            // the destination name is a remote port id, so use that as the 
dest id
-            destUUID = tmpUUID.value();
-            logger_->log_debug("Using 'destination name' containing a remote 
port id to match the destination for "
-                               "connection '%s': destination name => [%s]",
-                               name, connectionDestProcName);
-          } else {
-            // look the processor up by name
-            auto destProcessor = 
parent->findProcessorByName(connectionDestProcName);
-            if (NULL != destProcessor) {
-              destUUID = destProcessor->getUUID();
-              logger_->log_debug("Using 'destination name' to match 
destination with same name for "
-                                 "connection '%s': destination name => [%s]",
-                                 name, connectionDestProcName);
-            } else {
-              // we ran out of ways to discover the destination processor
-              logger_->log_error("Could not locate a destination with name %s 
to create a connection", connectionDestProcName);
-              throw std::invalid_argument("Could not locate a destination with 
name " + connectionDestProcName + " to create a connection");
-            }
-          }
-        }
-        connection->setDestinationUUID(destUUID);
-
-        if (connectionNode["flowfile expiration"]) {
-          uint64_t expirationDuration = 0;
-          std::string expiration = connectionNode["flowfile 
expiration"].as<std::string>();
-          TimeUnit unit;
-          if (core::Property::StringToTime(expiration, expirationDuration, 
unit) && core::Property::ConvertTimeUnitToMS(expirationDuration, unit, 
expirationDuration)) {
-            logger_->log_debug("parseConnection: flowfile expiration => [%d]", 
expirationDuration);
-            connection->setFlowExpirationDuration(expirationDuration);
-          }
-        }
+    const utils::optional<utils::Identifier> uuid = 
utils::Identifier::parse(id);
+    if (!uuid) {
+      logger_->log_debug("Incorrect connection UUID format.");
+      throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect connection 
UUID format.");
+    }
 
-        if (connectionNode["drop empty"]) {
-          std::string strvalue = connectionNode["drop 
empty"].as<std::string>();
-          bool dropEmpty = false;
-          if (utils::StringUtils::StringToBool(strvalue, dropEmpty)) {
-            connection->setDropEmptyFlowFiles(dropEmpty);
-          }
-        }
+    connection = createConnection(name, uuid.value());
+    logger_->log_debug("Created connection with UUID %s and name %s", id, 
name);
+    const yaml::YamlConnectionParser connectionParser(connectionNode, name, 
gsl::not_null<core::ProcessGroup*>{ parent }, logger_);
+    
connectionParser.configureConnectionSourceRelationshipsFromYaml(connection);
+    connection->setMaxQueueSize(connectionParser.getWorkQueueSizeFromYaml());
+    
connection->setMaxQueueDataSize(connectionParser.getWorkQueueDataSizeFromYaml());
+    connection->setSourceUUID(connectionParser.getSourceUUIDFromYaml());
+    
connection->setDestinationUUID(connectionParser.getDestinationUUIDFromYaml());
+    
connection->setFlowExpirationDuration(connectionParser.getFlowFileExpirationFromYaml());
+    connection->setDropEmptyFlowFiles(connectionParser.getDropEmptyFromYaml());
 
-        if (connection) {
-          parent->addConnection(connection);
-        }
-      }
-    }
+    parent->addConnection(connection);
   }
 }
 
@@ -717,10 +617,10 @@ void YamlConfiguration::parsePortYaml(YAML::Node 
*portNode, core::ProcessGroup *
   YAML::Node inputPortsObj = portNode->as<YAML::Node>();
 
   // Check for required fields
-  checkRequiredField(&inputPortsObj, "name",
+  yaml::checkRequiredField(&inputPortsObj, "name", logger_,
   CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
   auto nameStr = inputPortsObj["name"].as<std::string>();
-  checkRequiredField(&inputPortsObj, "id",
+  yaml::checkRequiredField(&inputPortsObj, "id", logger_,
   CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY,
                      "The field 'id' is required for "
                          "the port named '" + nameStr + "' in the YAML Config. 
If this port "
@@ -989,26 +889,6 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node 
*yamlNode, const std::
   return id;
 }
 
-void YamlConfiguration::checkRequiredField(YAML::Node *yamlNode, const 
std::string &fieldName, const std::string &yamlSection, const std::string 
&errorMessage) {
-  std::string errMsg = errorMessage;
-  if (!yamlNode->as<YAML::Node>()[fieldName]) {
-    if (errMsg.empty()) {
-      // Build a helpful error message for the user so they can fix the
-      // invalid YAML config file, using the component name if present
-      errMsg =
-          yamlNode->as<YAML::Node>()["name"] ?
-              "Unable to parse configuration file for component named '" + 
yamlNode->as<YAML::Node>()["name"].as<std::string>() + "' as required field '" 
+ fieldName + "' is missing" :
-              "Unable to parse configuration file as required field '" + 
fieldName + "' is missing";
-      if (!yamlSection.empty()) {
-        errMsg += " [in '" + yamlSection + "' section of configuration file]";
-      }
-    }
-    logging::LOG_ERROR(logger_) << errMsg;
-
-    throw std::invalid_argument(errMsg);
-  }
-}
-
 YAML::Node YamlConfiguration::getOptionalField(YAML::Node *yamlNode, const 
std::string &fieldName, const YAML::Node &defaultValue, const std::string 
&yamlSection,
                                                const std::string 
&providedInfoMessage) {
   std::string infoMessage = providedInfoMessage;
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp 
b/libminifi/src/core/yaml/YamlConnectionParser.cpp
new file mode 100644
index 0000000..cd26117
--- /dev/null
+++ b/libminifi/src/core/yaml/YamlConnectionParser.cpp
@@ -0,0 +1,182 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "core/yaml/YamlConnectionParser.h"
+#include "core/yaml/CheckRequiredField.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+namespace yaml {
+
+// This is no longer needed in c++17
+constexpr const char* YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY;
+
+void 
YamlConnectionParser::configureConnectionSourceRelationshipsFromYaml(const 
std::shared_ptr<minifi::Connection>& connection) const {
+  auto addNewRelationshipToConnection = [&] (const std::string& 
relationship_name) {
+    core::Relationship relationship(relationship_name, "");
+    logger_->log_debug("parseConnection: relationship => [%s]", 
relationship_name);
+    connection->addRelationship(std::move(relationship));
+  };
+  // Configure connection source
+  if (connectionNode_.as<YAML::Node>()["source relationship name"]) {
+    addNewRelationshipToConnection(connectionNode_["source relationship 
name"].as<std::string>());
+  } else if (connectionNode_.as<YAML::Node>()["source relationship names"]) {
+    auto relList = connectionNode_["source relationship names"];
+      if (relList.IsSequence()) {
+        for (const auto &rel : relList) {
+          addNewRelationshipToConnection(rel.as<std::string>());
+        }
+      } else {
+        addNewRelationshipToConnection(relList.as<std::string>());
+      }
+  }
+}
+
+uint64_t YamlConnectionParser::getWorkQueueSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work 
queue size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_size;
+    if (core::Property::StringToInt(max_work_queue_str, max_work_queue_size)) {
+      logger_->log_debug("Setting %" PRIu64 " as the max queue size.", 
max_work_queue_size);
+      return max_work_queue_size;
+    }
+    logger_->log_info("Invalid max queue size value: %s.", max_work_queue_str);
+  }
+  return 0;
+}
+
+uint64_t YamlConnectionParser::getWorkQueueDataSizeFromYaml() const {
+  const YAML::Node max_work_queue_data_size_node = connectionNode_["max work 
queue data size"];
+  if (max_work_queue_data_size_node) {
+    auto max_work_queue_str = max_work_queue_data_size_node.as<std::string>();
+    uint64_t max_work_queue_data_size = 0;
+    if (core::Property::StringToInt(max_work_queue_str, 
max_work_queue_data_size)) {
+      logger_->log_debug("Setting %" PRIu64 "as the max as the max queue data 
size.", max_work_queue_data_size);
+      return max_work_queue_data_size;
+    }
+    logger_->log_info("Invalid max queue data size value: %s.", 
max_work_queue_str);
+  }
+  return 0;
+}
+
+utils::Identifier YamlConnectionParser::getSourceUUIDFromYaml() const {
+  const YAML::Node source_id_node = connectionNode_["source id"];
+  if (source_id_node) {
+    const utils::optional<utils::Identifier> srcUUID = 
utils::Identifier::parse(source_id_node.as<std::string>());
+    if (srcUUID) {
+      logger_->log_debug("Using 'source id' to match source with same id for 
connection '%s': source id => [%s]", name_, srcUUID.value().to_string());
+      return srcUUID.value();
+    }
+    logger_->log_error("Invalid source id value: %s.", 
source_id_node.as<std::string>());
+    throw std::invalid_argument("Invalid source id");
+  }
+  // if we don't have a source id, try to resolve using source name. config 
schema v2 will make this unnecessary
+  checkRequiredField(&connectionNode_, "source name", logger_, 
CONFIG_YAML_CONNECTIONS_KEY);
+  const std::string connectionSrcProcName = connectionNode_["source 
name"].as<std::string>();
+  const utils::optional<utils::Identifier> srcUUID = 
utils::Identifier::parse(connectionSrcProcName);
+  if (srcUUID && parent_->findProcessorById(srcUUID.value())) {
+    // the source name is a remote port id, so use that as the source id
+    logger_->log_debug("Using 'source name' containing a remote port id to 
match the source for connection '%s': source name => [%s]", name_, 
connectionSrcProcName);
+    return srcUUID.value();
+  }
+  // lastly, look the processor up by name
+  auto srcProcessor = parent_->findProcessorByName(connectionSrcProcName);
+  if (nullptr != srcProcessor) {
+    logger_->log_debug("Using 'source name' to match source with same name for 
connection '%s': source name => [%s]", name_, connectionSrcProcName);
+    return srcProcessor->getUUID();
+  }
+  // we ran out of ways to discover the source processor
+  const std::string error_msg = "Could not locate a source with name " + 
connectionSrcProcName + " to create a connection ";
+  logger_->log_error(error_msg.c_str());
+  throw std::invalid_argument(error_msg);
+}
+
+utils::Identifier YamlConnectionParser::getDestinationUUIDFromYaml() const {
+  const YAML::Node destination_id_node = connectionNode_["destination id"];
+  if (destination_id_node) {
+    const utils::optional<utils::Identifier> destUUID = 
utils::Identifier::parse(destination_id_node.as<std::string>());
+    if (destUUID) {
+      logger_->log_debug("Using 'destination id' to match destination with 
same id for connection '%s': destination id => [%s]", name_, 
destUUID.value().to_string());
+      return destUUID.value();
+    }
+    logger_->log_error("Invalid destination id value: %s.", 
destination_id_node.as<std::string>());
+    throw std::invalid_argument("Invalid destination id");
+  }
+  // we use the same logic as above for resolving the source processor
+  // for looking up the destination processor in absence of a processor id
+  checkRequiredField(&connectionNode_, "destination name", logger_, 
CONFIG_YAML_CONNECTIONS_KEY);
+  std::string connectionDestProcName = connectionNode_["destination 
name"].as<std::string>();
+  const utils::optional<utils::Identifier> destUUID = 
utils::Identifier::parse(connectionDestProcName);
+  if (destUUID && parent_->findProcessorById(destUUID.value())) {
+    // the destination name is a remote port id, so use that as the dest id
+    logger_->log_debug("Using 'destination name' containing a remote port id 
to match the destination for connection '%s': destination name => [%s]", name_, 
connectionDestProcName);
+    return destUUID.value();
+  }
+  // look the processor up by name
+  auto destProcessor = parent_->findProcessorByName(connectionDestProcName);
+  if (NULL != destProcessor) {
+    logger_->log_debug("Using 'destination name' to match destination with 
same name for connection '%s': destination name => [%s]", name_, 
connectionDestProcName);
+    return destProcessor->getUUID();
+  }
+  // we ran out of ways to discover the destination processor
+  const std::string error_msg = "Could not locate a destination with name " + 
connectionDestProcName + " to create a connection";
+  logger_->log_error(error_msg.c_str());
+  throw std::invalid_argument(error_msg);
+}
+
+uint64_t YamlConnectionParser::getFlowFileExpirationFromYaml() const {
+  const YAML::Node expiration_node = connectionNode_["flowfile expiration"];
+  if (!expiration_node) {
+    logger_->log_debug("parseConnection: flowfile expiration is not set, 
assuming 0 (never expire)");
+    return 0;
+  }
+  uint64_t expirationDuration = 0;
+  TimeUnit unit;
+  const std::string flowfile_expiration_str = 
expiration_node.as<std::string>();
+  if (!core::Property::StringToTime(flowfile_expiration_str, 
expirationDuration, unit) || 
!core::Property::ConvertTimeUnitToMS(expirationDuration, unit, 
expirationDuration)) {
+    // We should throw here, but we do not.
+    // The reason is that our parser only accepts time formats that consists 
of a number and
+    // a unit, but users might use this field populated with a "0" (and no 
units).
+    // We cannot correct this, because there is no API contract for the 
config, we need to support
+    // all already-supported configuration files.
+    // This has the side-effect of allowing values like "20 minuites" and 
silently defaulting to 0.
+    logger_->log_debug("Parsing failure for flowfile expiration duration");
+  }
+  logger_->log_debug("parseConnection: flowfile expiration => [%d]", 
expirationDuration);
+  return expirationDuration;
+}
+
+bool YamlConnectionParser::getDropEmptyFromYaml() const {
+  const YAML::Node drop_empty_node = connectionNode_["drop empty"];
+  if (drop_empty_node) {
+    bool dropEmpty = false;
+    return utils::StringUtils::StringToBool(drop_empty_node.as<std::string>(), 
dropEmpty) && dropEmpty;
+  }
+  return false;
+}
+
+}  // namespace yaml
+}  // namespace core
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/main/MainHelper.h b/main/MainHelper.h
index ea45833..247bea0 100644
--- a/main/MainHelper.h
+++ b/main/MainHelper.h
@@ -45,13 +45,6 @@ extern "C" {
 //! Define home environment variable
 #define MINIFI_HOME_ENV_KEY "MINIFI_HOME"
 
-/* Define Parser Values for Configuration YAML sections */
-#define CONFIG_YAML_PROCESSORS_KEY "Processors"
-#define CONFIG_YAML_FLOW_CONTROLLER_KEY "Flow Controller"
-#define CONFIG_YAML_CONNECTIONS_KEY "Connections"
-#define CONFIG_YAML_REMOTE_PROCESSING_GROUPS_KEY "Remote Processing Groups"
-
-
 #ifdef _MSC_VER
 #ifndef PATH_MAX
 #define PATH_MAX 260

Reply via email to