This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 4389b9ac0 MINIFICPP-1962 Implement communication between process group
through ports
4389b9ac0 is described below
commit 4389b9ac05e43e1dbf908e9014787d07c8a9cd05
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Tue Nov 29 10:46:26 2022 +0100
MINIFICPP-1962 Implement communication between process group through ports
Closes #1451
Signed-off-by: Marton Szasz <[email protected]>
---
.../standard-processors/tests/CMakeLists.txt | 2 +-
.../tests/integration/ProcessGroupTest.cpp | 63 +++++++
.../tests/unit/ProcessGroupTestUtils.h | 154 ++++++++++++++---
.../tests/unit/YamlProcessGroupParserTests.cpp | 182 +++++++++++++++++++--
.../include/{core/Funnel.h => ForwardingNode.h} | 22 +--
.../{src/core/Funnel.cpp => include/Funnel.h} | 30 ++--
libminifi/include/Port.h | 47 ++++++
libminifi/include/core/ProcessGroup.h | 27 ++-
libminifi/include/core/yaml/YamlConfiguration.h | 30 ++--
.../src/{core/Funnel.cpp => ForwardingNode.cpp} | 12 +-
libminifi/src/core/ProcessGroup.cpp | 87 +++++++++-
libminifi/src/core/yaml/YamlConfiguration.cpp | 44 ++++-
libminifi/src/core/yaml/YamlConnectionParser.cpp | 5 +-
libminifi/test/resources/TestProcessGroup.yml | 69 ++++++++
14 files changed, 666 insertions(+), 108 deletions(-)
diff --git a/extensions/standard-processors/tests/CMakeLists.txt
b/extensions/standard-processors/tests/CMakeLists.txt
index 01e162de2..784de1824 100644
--- a/extensions/standard-processors/tests/CMakeLists.txt
+++ b/extensions/standard-processors/tests/CMakeLists.txt
@@ -97,8 +97,8 @@ if(NOT OPENSSL_OFF)
endif()
add_test(NAME TailFileTest COMMAND TailFileTest
"${TEST_RESOURCES}/TestTailFile.yml" "${TEST_RESOURCES}/")
-
add_test(NAME TailFileCronTest COMMAND TailFileTest
"${TEST_RESOURCES}/TestTailFileCron.yml" "${TEST_RESOURCES}/")
+add_test(NAME ProcessGroupTest COMMAND ProcessGroupTest
"${TEST_RESOURCES}/TestProcessGroup.yml")
FOREACH(resourcefile ${RESOURCE_APPS})
get_filename_component(resourcefilename "${resourcefile}" NAME_WE)
diff --git
a/extensions/standard-processors/tests/integration/ProcessGroupTest.cpp
b/extensions/standard-processors/tests/integration/ProcessGroupTest.cpp
new file mode 100644
index 000000000..5e8358686
--- /dev/null
+++ b/extensions/standard-processors/tests/integration/ProcessGroupTest.cpp
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <cassert>
+#include <string>
+
+#include "core/logging/Logger.h"
+#include "FlowController.h"
+#include "TestBase.h"
+#include "Catch.h"
+#include "processors/GenerateFlowFile.h"
+#include "processors/LogAttribute.h"
+#include "processors/UpdateAttribute.h"
+#include "integration/IntegrationBase.h"
+#include "utils/IntegrationTestUtils.h"
+
+using namespace std::literals::chrono_literals;
+
+class ProcessGroupTestHarness : public IntegrationBase {
+ public:
+ ProcessGroupTestHarness() : IntegrationBase(2s) {
+ }
+
+ void testSetup() override {
+
LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>();
+
LogTestController::getInstance().setTrace<minifi::processors::GenerateFlowFile>();
+
LogTestController::getInstance().setTrace<minifi::processors::UpdateAttribute>();
+ }
+
+ void runAssertions() override {
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+
assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(wait_time_),
+ "key:test_attribute value:success"));
+ }
+};
+
+int main(int argc, char **argv) {
+ std::string test_file_location;
+ if (argc > 1) {
+ test_file_location = argv[1];
+ }
+
+ ProcessGroupTestHarness harness;
+ harness.run(test_file_location);
+
+ return 0;
+}
diff --git a/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
index 58a0dacff..bb9d1673b 100644
--- a/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
+++ b/extensions/standard-processors/tests/unit/ProcessGroupTestUtils.h
@@ -60,9 +60,21 @@ struct Lines {
}
};
+enum class ConnectionFailure {
+ UNRESOLVED_SOURCE,
+ UNRESOLVED_DESTINATION,
+ INPUT_CANNOT_BE_SOURCE,
+ OUTPUT_CANNOT_BE_DESTINATION,
+ INPUT_CANNOT_BE_DESTINATION,
+ OUTPUT_CANNOT_BE_SOURCE
+};
+
struct Proc {
+ Proc(std::string id, std::string name, const
std::optional<ConnectionFailure>& failure = std::nullopt)
+ : id(std::move(id)), name(std::move(name)), failure(failure) {}
std::string id;
std::string name;
+ std::optional<ConnectionFailure> failure;
Lines serialize() const {
return {{
@@ -73,17 +85,33 @@ struct Proc {
}
};
-struct UnresolvedProc {
- explicit UnresolvedProc(std::string id): id(std::move(id)) {}
+template<typename Tag>
+struct Port {
+ Port(std::string id, std::string name, const
std::optional<ConnectionFailure>& failure = std::nullopt)
+ : id(std::move(id)), name(std::move(name)), failure(failure) {}
std::string id;
+ std::string name;
+ std::optional<ConnectionFailure> failure;
+
+ Lines serialize() const {
+ return {{
+ "- id: " + id,
+ " name: " + name
+ }};
+ }
};
+using InputPort = Port<struct InputTag>;
+using OutputPort = Port<struct OutputTag>;
+
struct MaybeProc {
- MaybeProc(const Proc& proc): id(proc.id), name(proc.name) {} // NOLINT
- MaybeProc(const UnresolvedProc& proc) : id(proc.id) {} // NOLINT
+ MaybeProc(const Proc& proc) : id(proc.id), name(proc.name),
failure(proc.failure) {} // NOLINT(runtime/explicit)
+ MaybeProc(const InputPort& port) : id(port.id), name(port.name),
failure(port.failure) {} // NOLINT(runtime/explicit)
+ MaybeProc(const OutputPort& port) : id(port.id), name(port.name),
failure(port.failure) {} // NOLINT(runtime/explicit)
std::string id;
- std::optional<std::string> name;
+ std::string name;
+ std::optional<ConnectionFailure> failure;
};
struct Conn {
@@ -139,6 +167,14 @@ struct Group {
rpgs_ = std::move(rpgs);
return *this;
}
+ Group& With(std::vector<InputPort> input_ports) {
+ input_ports_ = std::move(input_ports);
+ return *this;
+ }
+ Group& With(std::vector<OutputPort> output_ports) {
+ output_ports_ = std::move(output_ports);
+ return *this;
+ }
Lines serialize(bool is_root = true) const {
Lines body;
if (processors_.empty()) {
@@ -169,6 +205,22 @@ struct Group {
body.append(subgroup.serialize(false).indentAll());
}
}
+ if (input_ports_.empty()) {
+ body.emplace_back("Input Ports: []");
+ } else {
+ body.emplace_back("Input Ports:");
+ for (const auto& port : input_ports_) {
+ body.append(port.serialize().indentAll());
+ }
+ }
+ if (output_ports_.empty()) {
+ body.emplace_back("Output Ports: []");
+ } else {
+ body.emplace_back("Output Ports:");
+ for (const auto& port : output_ports_) {
+ body.append(port.serialize().indentAll());
+ }
+ }
Lines lines;
if (is_root) {
lines.emplace_back("Flow Controller:");
@@ -186,12 +238,15 @@ struct Group {
std::vector<Proc> processors_;
std::vector<Group> subgroups_;
std::vector<RPG> rpgs_;
+ std::vector<InputPort> input_ports_;
+ std::vector<OutputPort> output_ports_;
};
struct ProcessGroupTestAccessor {
FIELD_ACCESSOR(processors_)
FIELD_ACCESSOR(connections_)
FIELD_ACCESSOR(child_process_groups_)
+ FIELD_ACCESSOR(ports_)
};
template<typename T, typename = void>
@@ -222,6 +277,54 @@ auto findByName(const std::set<T>& set, const std::string&
name) -> decltype(Res
return nullptr;
}
+void assertFailure(const Conn& expected, ConnectionFailure failure) {
+ auto assertMessage = [](const std::string& message) {
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(std::chrono::seconds{1},
message));
+ };
+
+ switch (failure) {
+ case ConnectionFailure::UNRESOLVED_DESTINATION: {
+ assertMessage("Cannot find the destination processor with id '" +
expected.destination.id + "' for the connection [name = '" + expected.name +
"'");
+ break;
+ }
+ case ConnectionFailure::UNRESOLVED_SOURCE: {
+ assertMessage("Cannot find the source processor with id '" +
expected.source.id + "' for the connection [name = '" + expected.name + "'");
+ break;
+ }
+ case ConnectionFailure::INPUT_CANNOT_BE_SOURCE: {
+ assertMessage("Input port [id = '" + expected.source.id + "'] cannot be
a source outside the process group in the connection [name = '" + expected.name
+ "'");
+ break;
+ }
+ case ConnectionFailure::OUTPUT_CANNOT_BE_DESTINATION: {
+ assertMessage("Output port [id = '" + expected.destination.id + "']
cannot be a destination outside the process group in the connection [name = '"
+ expected.name + "'");
+ break;
+ }
+ case ConnectionFailure::INPUT_CANNOT_BE_DESTINATION: {
+ assertMessage("Input port [id = '" + expected.destination.id + "']
cannot be a destination inside the process group in the connection [name = '" +
expected.name + "'");
+ break;
+ }
+ case ConnectionFailure::OUTPUT_CANNOT_BE_SOURCE: {
+ assertMessage("Output port [id = '" + expected.source.id + "'] cannot be
a source inside the process group in the connection [name = '" + expected.name
+ "'");
+ break;
+ }
+ }
+}
+
+void verifyConnectionNode(minifi::Connection* conn, const Conn& expected) {
+ if (expected.source.failure) {
+ REQUIRE(conn->getSource() == nullptr);
+ assertFailure(expected, *expected.source.failure);
+ } else {
+ REQUIRE(conn->getSource()->getName() == expected.source.name);
+ }
+ if (expected.destination.failure) {
+ REQUIRE(conn->getDestination() == nullptr);
+ assertFailure(expected, *expected.destination.failure);
+ } else {
+ REQUIRE(conn->getDestination()->getName() == expected.destination.name);
+ }
+}
+
void verifyProcessGroup(core::ProcessGroup& group, const Group& pattern) {
// verify name
REQUIRE(group.getName() == pattern.name_);
@@ -231,33 +334,34 @@ void verifyProcessGroup(core::ProcessGroup& group, const
Group& pattern) {
for (auto& expected : pattern.connections_) {
auto conn = findByName(connections, expected.name);
REQUIRE(conn);
- if (!expected.source.name) {
- REQUIRE(conn->getSource() == nullptr);
- REQUIRE(utils::verifyLogLinePresenceInPollTime(
- std::chrono::seconds{1},
- "Cannot find the source processor with id '" + expected.source.id
- + "' for the connection [name = '" + expected.name + "'"));
- } else {
- REQUIRE(conn->getSource()->getName() == expected.source.name);
- }
- if (!expected.destination.name) {
- REQUIRE(conn->getDestination() == nullptr);
- REQUIRE(utils::verifyLogLinePresenceInPollTime(
- std::chrono::seconds{1},
- "Cannot find the destination processor with id '" +
expected.destination.id
- + "' for the connection [name = '" + expected.name + "'"));
- } else {
- REQUIRE(conn->getDestination()->getName() == expected.destination.name);
- }
+ verifyConnectionNode(conn, expected);
}
- // verify processors
+ // verify processors and ports
const auto& processors = ProcessGroupTestAccessor::get_processors_(group);
- REQUIRE(processors.size() == pattern.processors_.size());
+ REQUIRE(processors.size() == pattern.processors_.size() +
pattern.input_ports_.size() + pattern.output_ports_.size());
for (auto& expected : pattern.processors_) {
REQUIRE(findByName(processors, expected.name));
}
+ for (auto& expected : pattern.input_ports_) {
+ REQUIRE(findByName(processors, expected.name));
+ }
+
+ for (auto& expected : pattern.output_ports_) {
+ REQUIRE(findByName(processors, expected.name));
+ }
+
+ const auto& ports = ProcessGroupTestAccessor::get_ports_(group);
+ REQUIRE(ports.size() == pattern.input_ports_.size() +
pattern.output_ports_.size());
+ for (auto& expected : pattern.input_ports_) {
+ REQUIRE(findByName(ports, expected.name));
+ }
+
+ for (auto& expected : pattern.output_ports_) {
+ REQUIRE(findByName(ports, expected.name));
+ }
+
std::set<core::ProcessGroup*> simple_subgroups;
std::set<core::ProcessGroup*> rpg_subgroups;
for (auto& subgroup :
ProcessGroupTestAccessor::get_child_process_groups_(group)) {
diff --git
a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
index 52e264d5f..0f5879587 100644
--- a/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
+++ b/extensions/standard-processors/tests/unit/YamlProcessGroupParserTests.cpp
@@ -24,7 +24,7 @@
static core::YamlConfiguration config(nullptr, nullptr, nullptr, nullptr,
std::make_shared<minifi::Configure>());
-TEST_CASE("Root process group is correctly parsed",
"[YamlProcessGroupParser1]") {
+TEST_CASE("Root process group is correctly parsed",
"[YamlProcessGroupParser]") {
auto pattern = Group("root")
.With({
Conn{"Conn1",
@@ -48,7 +48,7 @@ TEST_CASE("Root process group is correctly parsed",
"[YamlProcessGroupParser1]")
verifyProcessGroup(*root, pattern);
}
-TEST_CASE("Nested process group is correctly parsed",
"[YamlProcessGroupParser2]") {
+TEST_CASE("Nested process group is correctly parsed",
"[YamlProcessGroupParser]") {
auto pattern = Group("root")
.With({Conn{"Conn1",
Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
@@ -71,7 +71,7 @@ TEST_CASE("Nested process group is correctly parsed",
"[YamlProcessGroupParser2]
verifyProcessGroup(*root, pattern);
}
-TEST_CASE("Cannot connect processors from different groups",
"[YamlProcessGroupParser3]") {
+TEST_CASE("Cannot connect processors from different groups",
"[YamlProcessGroupParser]") {
TestController controller;
LogTestController::getInstance().setTrace<core::YamlConfiguration>();
Proc Proc1{"00000000-0000-0000-0000-000000000001", "Proc1"};
@@ -102,18 +102,18 @@ TEST_CASE("Cannot connect processors from different
groups", "[YamlProcessGroupP
}
SECTION("Connecting processors in their child/parent group") {
- Conn1.source = UnresolvedProc{Child1_Proc1.id};
- Conn1.destination = UnresolvedProc{Child1_Port1.id};
+ Conn1.source = Proc{Child1_Proc1.id, Child1_Proc1.name,
ConnectionFailure::UNRESOLVED_SOURCE};
+ Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name,
ConnectionFailure::UNRESOLVED_DESTINATION};
- Child1_Conn1.source = UnresolvedProc{Proc1.id};
- Child1_Conn1.destination = UnresolvedProc{Port1.id};
+ Child1_Conn1.source = Proc{Proc1.id, Proc1.name,
ConnectionFailure::UNRESOLVED_SOURCE};
+ Child1_Conn1.destination = Proc{Port1.id, Proc1.name,
ConnectionFailure::UNRESOLVED_DESTINATION};
}
SECTION("Connecting processors between their own and their child/parent
group") {
Conn1.source = Proc1;
- Conn1.destination = UnresolvedProc{Child1_Port1.id};
+ Conn1.destination = Proc{Child1_Port1.id, Child1_Port1.name,
ConnectionFailure::UNRESOLVED_DESTINATION};
- Child1_Conn1.source = UnresolvedProc{Port1.id};
+ Child1_Conn1.source = Proc{Port1.id, Port1.name,
ConnectionFailure::UNRESOLVED_SOURCE};
Child1_Conn1.destination = Child1_Proc1;
}
@@ -121,11 +121,171 @@ TEST_CASE("Cannot connect processors from different
groups", "[YamlProcessGroupP
Conn1.source = Proc1;
Conn1.destination = Port1;
- Child1_Conn1.source = UnresolvedProc{Child2_Proc1.id};
- Child1_Conn1.destination = UnresolvedProc{Child2_Port1.id};
+ Child1_Conn1.source = Proc{Child2_Proc1.id, Child2_Proc1.name,
ConnectionFailure::UNRESOLVED_SOURCE};
+ Child1_Conn1.destination = Proc{Child2_Port1.id, Child2_Port1.name,
ConnectionFailure::UNRESOLVED_DESTINATION};
}
auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
verifyProcessGroup(*root, pattern);
}
+
+TEST_CASE("Processor can communicate with child process group's input port",
"[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+ InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+ .With({
+ Group("Child1")
+ .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Child process group can provide input for root processor through
output port", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"},
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+ .With({
+ Group("Child1")
+ .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Child process groups can communicate through ports",
"[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"},
+ InputPort{"00000000-0000-0000-0000-000000000003", "Port2"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+ .With({
+ Group("Child1")
+ .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"}}),
+ Group("Child2")
+ .With({InputPort{"00000000-0000-0000-0000-000000000003", "Port2"}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Processor cannot communicate with child's nested process group",
"[YamlProcessGroupParser]") {
+ Proc Proc1{"00000000-0000-0000-0000-000000000001", "Proc1"};
+ OutputPort Port1{"00000000-0000-0000-0000-000000000002", "Port1"};
+ InputPort Port2{"00000000-0000-0000-0000-000000000003", "Port2",
ConnectionFailure::UNRESOLVED_DESTINATION};
+
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ Proc1,
+ Port2}})
+ .With({Proc1})
+ .With({
+ Group("Child1")
+ .With({Port1})
+ .With({Group("Child2")
+ .With({Port2})})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Input port can be a connection's source and the output port can be
a destination inside the process group", "[YamlProcessGroupParser7]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ InputPort{"00000000-0000-0000-0000-000000000001", "Port1"},
+ OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}}})
+ .With({InputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+ .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}});
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Input port cannot be a connection's destination inside the process
group", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ Proc{"00000000-0000-0000-0000-000000000002", "Proc1"},
+ InputPort{"00000000-0000-0000-0000-000000000001", "Port1",
ConnectionFailure::INPUT_CANNOT_BE_DESTINATION}}})
+ .With({InputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+ .With({Proc{"00000000-0000-0000-0000-000000000002", "Proc1"}});
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Output port cannot be a connection's source inside the process
group", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ OutputPort{"00000000-0000-0000-0000-000000000001", "Port1",
ConnectionFailure::OUTPUT_CANNOT_BE_SOURCE},
+ Proc{"00000000-0000-0000-0000-000000000002", "Proc1"}}})
+ .With({OutputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+ .With({Proc{"00000000-0000-0000-0000-000000000002", "Proc1"}});
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Input port can be a connection's source and the output port can be
a destination inside the process group through processor",
"[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ InputPort{"00000000-0000-0000-0000-000000000001", "Port1"},
+ Proc{"00000000-0000-0000-0000-000000000003", "Proc1"}},
+ Conn{"Conn2",
+ Proc{"00000000-0000-0000-0000-000000000003", "Proc1"},
+ OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000003", "Proc1"}})
+ .With({InputPort{"00000000-0000-0000-0000-000000000001", "Port1"}})
+ .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port2"}});
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Processor cannot set connection's destination to child process
group's output port", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"},
+ OutputPort{"00000000-0000-0000-0000-000000000002", "Port1",
ConnectionFailure::OUTPUT_CANNOT_BE_DESTINATION}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+ .With({
+ Group("Child1")
+ .With({OutputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
+
+TEST_CASE("Processor cannot set connection's source to child process group's
input port", "[YamlProcessGroupParser]") {
+ auto pattern = Group("root")
+ .With({Conn{"Conn1",
+ InputPort{"00000000-0000-0000-0000-000000000002", "Port1",
ConnectionFailure::INPUT_CANNOT_BE_SOURCE},
+ Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}}})
+ .With({Proc{"00000000-0000-0000-0000-000000000001", "Proc1"}})
+ .With({
+ Group("Child1")
+ .With({InputPort{"00000000-0000-0000-0000-000000000002", "Port1"}})
+ });
+
+ auto root = config.getRootFromPayload(pattern.serialize().join("\n"));
+
+ verifyProcessGroup(*root, pattern);
+}
diff --git a/libminifi/include/core/Funnel.h
b/libminifi/include/ForwardingNode.h
similarity index 68%
rename from libminifi/include/core/Funnel.h
rename to libminifi/include/ForwardingNode.h
index 8c8e08c52..d492d3493 100644
--- a/libminifi/include/core/Funnel.h
+++ b/libminifi/include/ForwardingNode.h
@@ -17,35 +17,35 @@
*/
#pragma once
-#include <memory>
#include <string>
+#include <memory>
#include <utility>
-#include "logging/LoggerFactory.h"
-#include "Processor.h"
+#include "core/logging/LoggerFactory.h"
+#include "core/Processor.h"
-namespace org::apache::nifi::minifi::core {
+namespace org::apache::nifi::minifi {
-class Funnel final : public Processor {
+class ForwardingNode : public core::Processor {
public:
- Funnel(std::string name, const utils::Identifier& uuid) :
Processor(std::move(name), uuid),
logger_(logging::LoggerFactory<Funnel>::getLogger()) {}
- explicit Funnel(std::string name) : Processor(std::move(name)),
logger_(logging::LoggerFactory<Funnel>::getLogger()) {}
+ ForwardingNode(std::string name, const utils::Identifier& uuid,
std::shared_ptr<core::logging::Logger> logger) : Processor(std::move(name),
uuid), logger_(std::move(logger)) {
+ strategy_ = core::SchedulingStrategy::EVENT_DRIVEN;
+ }
+ ForwardingNode(std::string name, std::shared_ptr<core::logging::Logger>
logger) : Processor(std::move(name)), logger_(std::move(logger)) {}
static auto properties() { return std::array<core::Property, 0>{}; }
MINIFIAPI static const core::Relationship Success;
static auto relationships() { return std::array{Success}; }
MINIFIAPI static constexpr bool SupportsDynamicProperties = false;
MINIFIAPI static constexpr bool SupportsDynamicRelationships = false;
- MINIFIAPI static constexpr core::annotation::Input InputRequirement =
core::annotation::Input::INPUT_REQUIRED;
MINIFIAPI static constexpr bool IsSingleThreaded = false;
- ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
void initialize() override;
void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSession>& session) override;
private:
- std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<core::logging::Logger> logger_;
};
-} // namespace org::apache::nifi::minifi::core
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/core/Funnel.cpp b/libminifi/include/Funnel.h
similarity index 55%
copy from libminifi/src/core/Funnel.cpp
copy to libminifi/include/Funnel.h
index 5d3119470..e17689f5c 100644
--- a/libminifi/src/core/Funnel.cpp
+++ b/libminifi/include/Funnel.h
@@ -1,4 +1,5 @@
/**
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,25 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#pragma once
-#include "core/Funnel.h"
-#include "core/ProcessSession.h"
+#include <string>
+#include <utility>
-namespace org::apache::nifi::minifi::core {
+#include "ForwardingNode.h"
-const Relationship Funnel::Success("success", "FlowFiles are routed to success
relationship");
+namespace org::apache::nifi::minifi {
-void Funnel::initialize() {
- setSupportedRelationships(relationships());
-}
+class Funnel final : public ForwardingNode {
+ public:
+ Funnel(std::string name, const utils::Identifier& uuid) :
ForwardingNode(std::move(name), uuid,
core::logging::LoggerFactory<Funnel>::getLogger()) {}
+ explicit Funnel(std::string name) : ForwardingNode(std::move(name),
core::logging::LoggerFactory<Funnel>::getLogger()) {}
-void Funnel::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
- logger_->log_trace("On trigger %s", getUUIDStr());
- std::shared_ptr<core::FlowFile> flow_file = session->get();
- if (!flow_file) {
- return;
- }
- session->transfer(flow_file, Success);
-}
+ MINIFIAPI static constexpr core::annotation::Input InputRequirement =
core::annotation::Input::INPUT_REQUIRED;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+};
-} // namespace org::apache::nifi::minifi::core
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/Port.h b/libminifi/include/Port.h
new file mode 100644
index 000000000..65c358b9c
--- /dev/null
+++ b/libminifi/include/Port.h
@@ -0,0 +1,47 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include "ForwardingNode.h"
+
+namespace org::apache::nifi::minifi {
+
+enum class PortType {
+ INPUT,
+ OUTPUT
+};
+
+class Port final : public ForwardingNode {
+ public:
+ Port(std::string name, const utils::Identifier& uuid, PortType port_type) :
ForwardingNode(std::move(name), uuid,
core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}
+ Port(std::string name, PortType port_type) : ForwardingNode(std::move(name),
core::logging::LoggerFactory<Port>::getLogger()), port_type_(port_type) {}
+ PortType getPortType() const {
+ return port_type_;
+ }
+
+ MINIFIAPI static constexpr core::annotation::Input InputRequirement =
core::annotation::Input::INPUT_ALLOWED;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+ private:
+ PortType port_type_;
+};
+
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/core/ProcessGroup.h
b/libminifi/include/core/ProcessGroup.h
index 76a0c9259..96dd68438 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -27,19 +27,21 @@
#include <algorithm>
#include <set>
#include <utility>
+#include <tuple>
#include "Processor.h"
-#include "Funnel.h"
#include "Exception.h"
#include "TimerDrivenSchedulingAgent.h"
#include "EventDrivenSchedulingAgent.h"
#include "CronDrivenSchedulingAgent.h"
+#include "Port.h"
#include "core/logging/Logger.h"
#include "controller/ControllerServiceNode.h"
#include "controller/ControllerServiceMap.h"
#include "utils/Id.h"
#include "utils/BaseHTTPClient.h"
#include "utils/CallBackTimer.h"
+#include "range/v3/algorithm/find_if.hpp"
struct ProcessGroupTestAccessor;
@@ -164,18 +166,22 @@ class ProcessGroup : public CoreComponent {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return parent_process_group_;
}
- // Add processor
- void addProcessor(std::unique_ptr<Processor> processor);
- // Add child processor group
+ [[maybe_unused]] std::tuple<Processor*, bool>
addProcessor(std::unique_ptr<Processor> processor);
+ void addPort(std::unique_ptr<Port> port);
void addProcessGroup(std::unique_ptr<ProcessGroup> child);
- // ! Add connections
void addConnection(std::unique_ptr<Connection> connection);
- // Generic find
+ const std::set<Port*>& getPorts() const {
+ return ports_;
+ }
+
+ Port* findPortById(const utils::Identifier& uuid) const;
+ Port* findChildPortById(const utils::Identifier& uuid) const;
+
template <typename Fun>
Processor* findProcessor(Fun condition, Traverse traverse) const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
- const auto found = std::find_if(processors_.cbegin(), processors_.cend(),
condition);
- if (found != processors_.cend()) {
+ const auto found = ranges::find_if(processors_, condition);
+ if (found != ranges::end(processors_)) {
return found->get();
}
for (const auto& processGroup : child_process_groups_) {
@@ -231,9 +237,10 @@ class ProcessGroup : public CoreComponent {
int config_version_;
// Process Group Type
const ProcessGroupType type_;
- // Processors (ProcessNode) inside this process group which include
Input/Output Port, Remote Process Group input/Output port
+ // Processors (ProcessNode) inside this process group which include Remote
Process Group input/Output port
std::set<std::unique_ptr<Processor>> processors_;
std::set<Processor*> failed_processors_;
+ std::set<Port*> ports_;
std::set<std::unique_ptr<ProcessGroup>> child_process_groups_;
// Connections between the processor inside the group;
std::set<std::unique_ptr<Connection>> connections_;
@@ -259,6 +266,8 @@ class ProcessGroup : public CoreComponent {
core::controller::ControllerServiceMap controller_service_map_;
private:
+ static Port* findPortById(const std::set<Port*>& ports, const
utils::Identifier& uuid);
+
// Mutex for protection
mutable std::recursive_mutex mutex_;
// Logger
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h
b/libminifi/include/core/yaml/YamlConfiguration.h
index 18ff1ee19..bf7f280ae 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -15,8 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_CORE_YAML_YAMLCONFIGURATION_H_
-#define LIBMINIFI_INCLUDE_CORE_YAML_YAMLCONFIGURATION_H_
+#pragma once
#include <memory>
#include <optional>
@@ -37,11 +36,7 @@
class YamlConfigurationTestAccessor;
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
static constexpr char const* CONFIG_YAML_FLOW_CONTROLLER_KEY = "Flow
Controller";
static constexpr char const* CONFIG_YAML_PROCESSORS_KEY = "Processors";
@@ -50,6 +45,8 @@ static constexpr char const*
CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY = "Remote Proc
static constexpr char const* CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY_V3 = "Remote
Process Groups";
static constexpr char const* CONFIG_YAML_PROVENANCE_REPORT_KEY = "Provenance
Reporting";
static constexpr char const* CONFIG_YAML_FUNNELS_KEY = "Funnels";
+static constexpr char const* CONFIG_YAML_INPUT_PORTS_KEY = "Input Ports";
+static constexpr char const* CONFIG_YAML_OUTPUT_PORTS_KEY = "Output Ports";
#define YAML_CONFIGURATION_USE_REGEX
@@ -261,6 +258,17 @@ class YamlConfiguration : public FlowConfiguration {
*/
void parseFunnelsYaml(const YAML::Node& node, core::ProcessGroup* parent);
+ /**
+ * Parses the Input/Output Ports section of a configuration YAML.
+ * The resulting ports are added to the parent ProcessGroup.
+ *
+ * @param node the YAML::Node containing the Input/Output Ports section
+ * of the configuration YAML
+ * @param parent the root node of flow configuration to which
+ * to add the funnels that are parsed
+ */
+ void parsePorts(const YAML::Node& node, core::ProcessGroup* parent, PortType
port_type);
+
/**
* A helper function for parsing or generating optional id fields.
*
@@ -320,10 +328,4 @@ class YamlConfiguration : public FlowConfiguration {
void raiseComponentError(const std::string &component_name, const
std::string &yaml_section, const std::string &reason) const;
};
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_CORE_YAML_YAMLCONFIGURATION_H_
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/Funnel.cpp b/libminifi/src/ForwardingNode.cpp
similarity index 73%
rename from libminifi/src/core/Funnel.cpp
rename to libminifi/src/ForwardingNode.cpp
index 5d3119470..c1bf0a1f0 100644
--- a/libminifi/src/core/Funnel.cpp
+++ b/libminifi/src/ForwardingNode.cpp
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-#include "core/Funnel.h"
+#include "ForwardingNode.h"
#include "core/ProcessSession.h"
-namespace org::apache::nifi::minifi::core {
+namespace org::apache::nifi::minifi {
-const Relationship Funnel::Success("success", "FlowFiles are routed to success
relationship");
+const core::Relationship ForwardingNode::Success("success", "FlowFiles are
routed to success relationship");
-void Funnel::initialize() {
+void ForwardingNode::initialize() {
setSupportedRelationships(relationships());
}
-void Funnel::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+void ForwardingNode::onTrigger(const std::shared_ptr<core::ProcessContext>&
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
logger_->log_trace("On trigger %s", getUUIDStr());
std::shared_ptr<core::FlowFile> flow_file = session->get();
if (!flow_file) {
@@ -35,4 +35,4 @@ void Funnel::onTrigger(const
std::shared_ptr<core::ProcessContext>& /*context*/,
session->transfer(flow_file, Success);
}
-} // namespace org::apache::nifi::minifi::core
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/core/ProcessGroup.cpp
b/libminifi/src/core/ProcessGroup.cpp
index b7444ecd0..7922717fb 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -91,7 +91,7 @@ bool ProcessGroup::isRemoteProcessGroup() {
}
-void ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
+std::tuple<Processor*, bool>
ProcessGroup::addProcessor(std::unique_ptr<Processor> processor) {
gsl_Expects(processor);
const auto name = processor->getName();
std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -101,6 +101,15 @@ void ProcessGroup::addProcessor(std::unique_ptr<Processor>
processor) {
} else {
logger_->log_debug("Not adding processor %s into process group %s, as it
is already there", name, name_);
}
+ return std::make_tuple(iter->get(), inserted);
+}
+
+void ProcessGroup::addPort(std::unique_ptr<Port> port) {
+ auto [processor, inserted] = addProcessor(std::move(port));
+ if (inserted) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ ports_.insert(static_cast<Port*>(processor));
+ }
}
void ProcessGroup::addProcessGroup(std::unique_ptr<ProcessGroup> child) {
@@ -323,6 +332,33 @@ void
ProcessGroup::getFlowFileContainers(std::map<std::string, Connectable*>& co
}
}
+Port* ProcessGroup::findPortById(const std::set<Port*>& ports, const
utils::Identifier& uuid) {
+ const auto found = ranges::find_if(ports, [&](auto port) {
+ utils::Identifier port_uuid = port->getUUID();
+ return port_uuid && uuid == port_uuid;
+ });
+ if (found != ranges::cend(ports)) {
+ return *found;
+ }
+ return nullptr;
+}
+
+Port* ProcessGroup::findPortById(const utils::Identifier& uuid) const {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ return findPortById(ports_, uuid);
+}
+
+Port* ProcessGroup::findChildPortById(const utils::Identifier& uuid) const {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ for (const auto& processGroup : child_process_groups_) {
+ const auto& ports = processGroup->getPorts();
+ if (auto port = findPortById(ports, uuid)) {
+ return port;
+ }
+ }
+ return nullptr;
+}
+
void ProcessGroup::addConnection(std::unique_ptr<Connection> connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -334,19 +370,52 @@ void
ProcessGroup::addConnection(std::unique_ptr<Connection> connection) {
auto& insertedConnection = *insertPos;
logger_->log_debug("Add connection %s into process group %s",
insertedConnection->getName(), name_);
- // only allow connections between processors of the same process group
- auto source = this->findProcessorById(insertedConnection->getSourceUUID(),
Traverse::ExcludeChildren);
+ // only allow connections between processors of the same process group or
in/output ports of child process groups
+ // check input and output ports connection restrictions inside and outside a
process group
+ Processor* source = findPortById(insertedConnection->getSourceUUID());
+ if (source && static_cast<Port*>(source)->getPortType() == PortType::OUTPUT)
{
+ logger_->log_error("Output port [id = '%s'] cannot be a source inside the
process group in the connection [name = '%s', id = '%s']",
+ insertedConnection->getSourceUUID().to_string(),
insertedConnection->getName(), insertedConnection->getUUIDStr());
+ source = nullptr;
+ } else if (!source) {
+ source = findChildPortById(insertedConnection->getSourceUUID());
+ if (source && static_cast<Port*>(source)->getPortType() ==
PortType::INPUT) {
+ logger_->log_error("Input port [id = '%s'] cannot be a source outside
the process group in the connection [name = '%s', id = '%s']",
+ insertedConnection->getSourceUUID().to_string(),
insertedConnection->getName(), insertedConnection->getUUIDStr());
+ source = nullptr;
+ } else if (!source) {
+ source = findProcessorById(insertedConnection->getSourceUUID(),
Traverse::ExcludeChildren);
+ if (!source) {
+ logger_->log_error("Cannot find the source processor with id '%s' for
the connection [name = '%s', id = '%s']",
+ insertedConnection->getSourceUUID().to_string(),
insertedConnection->getName(), insertedConnection->getUUIDStr());
+ }
+ }
+ }
+
if (source) {
source->addConnection(insertedConnection.get());
- } else {
- logger_->log_error("Cannot find the source processor with id '%s' for the
connection [name = '%s', id = '%s']",
- insertedConnection->getSourceUUID().to_string(),
insertedConnection->getName(), insertedConnection->getUUIDStr());
}
- auto destination =
this->findProcessorById(insertedConnection->getDestinationUUID(),
Traverse::ExcludeChildren);
- if (!destination) {
- logger_->log_error("Cannot find the destination processor with id '%s' for
the connection [name = '%s', id = '%s']",
+
+ Processor* destination =
findPortById(insertedConnection->getDestinationUUID());
+ if (destination && static_cast<Port*>(destination)->getPortType() ==
PortType::INPUT) {
+ logger_->log_error("Input port [id = '%s'] cannot be a destination inside
the process group in the connection [name = '%s', id = '%s']",
insertedConnection->getDestinationUUID().to_string(),
insertedConnection->getName(), insertedConnection->getUUIDStr());
+ destination = nullptr;
+ } else if (!destination) {
+ destination = findChildPortById(insertedConnection->getDestinationUUID());
+ if (destination && static_cast<Port*>(destination)->getPortType() ==
PortType::OUTPUT) {
+ logger_->log_error("Output port [id = '%s'] cannot be a destination
outside the process group in the connection [name = '%s', id = '%s']",
+
insertedConnection->getDestinationUUID().to_string(),
insertedConnection->getName(), insertedConnection->getUUIDStr());
+ destination = nullptr;
+ } else if (!destination) {
+ destination =
findProcessorById(insertedConnection->getDestinationUUID(),
Traverse::ExcludeChildren);
+ if (!destination) {
+ logger_->log_error("Cannot find the destination processor with id '%s'
for the connection [name = '%s', id = '%s']",
+
insertedConnection->getDestinationUUID().to_string(),
insertedConnection->getName(), insertedConnection->getUUIDStr());
+ }
+ }
}
+
if (destination && destination != source) {
destination->addConnection(insertedConnection.get());
}
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index e1f23f3ac..0560d856b 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -27,6 +27,7 @@
#include "core/state/Value.h"
#include "Defaults.h"
#include "utils/TimeUtil.h"
+#include "Funnel.h"
#ifdef YAML_CONFIGURATION_USE_REGEX
#include "utils/RegexUtils.h"
@@ -93,6 +94,8 @@ std::unique_ptr<core::ProcessGroup>
YamlConfiguration::parseProcessGroupYaml(con
YAML::Node processorsNode = yamlNode[CONFIG_YAML_PROCESSORS_KEY];
YAML::Node connectionsNode =
yamlNode[yaml::YamlConnectionParser::CONFIG_YAML_CONNECTIONS_KEY];
YAML::Node funnelsNode = yamlNode[CONFIG_YAML_FUNNELS_KEY];
+ YAML::Node inputPortsNode = yamlNode[CONFIG_YAML_INPUT_PORTS_KEY];
+ YAML::Node outputPortsNode = yamlNode[CONFIG_YAML_OUTPUT_PORTS_KEY];
YAML::Node remoteProcessingGroupsNode = [&] {
// assignment is not supported on invalid Yaml nodes
YAML::Node candidate = yamlNode[CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY];
@@ -106,9 +109,8 @@ std::unique_ptr<core::ProcessGroup>
YamlConfiguration::parseProcessGroupYaml(con
parseProcessorNodeYaml(processorsNode, group.get());
parseRemoteProcessGroupYaml(remoteProcessingGroupsNode, group.get());
parseFunnelsYaml(funnelsNode, group.get());
- // parse connections last to give feedback if the source and/or destination
- // is not in the same process group
- parseConnectionYaml(connectionsNode, group.get());
+ parsePorts(inputPortsNode, group.get(), PortType::INPUT);
+ parsePorts(outputPortsNode, group.get(), PortType::OUTPUT);
if (childProcessGroupNodeSeq && childProcessGroupNodeSeq.IsSequence()) {
for (YAML::const_iterator it = childProcessGroupNodeSeq.begin(); it !=
childProcessGroupNodeSeq.end(); ++it) {
@@ -116,6 +118,10 @@ std::unique_ptr<core::ProcessGroup>
YamlConfiguration::parseProcessGroupYaml(con
group->addProcessGroup(parseProcessGroupYaml(childProcessGroupNode,
childProcessGroupNode));
}
}
+
+ // parse connections last to give feedback if the source and/or destination
processors
+ // is not in the same process group or input/output port connections are not
allowed
+ parseConnectionYaml(connectionsNode, group.get());
return group;
}
@@ -773,7 +779,7 @@ void YamlConfiguration::parseFunnelsYaml(const YAML::Node&
node, core::ProcessGr
throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect funnel UUID
format.");
});
- auto funnel = std::make_unique<core::Funnel>(name, uuid.value());
+ auto funnel = std::make_unique<Funnel>(name, uuid.value());
logger_->log_debug("Created funnel with UUID %s and name %s", id, name);
funnel->setScheduledState(core::RUNNING);
funnel->setSchedulingStrategy(core::EVENT_DRIVEN);
@@ -781,6 +787,36 @@ void YamlConfiguration::parseFunnelsYaml(const YAML::Node&
node, core::ProcessGr
}
}
+void YamlConfiguration::parsePorts(const YAML::Node& node, core::ProcessGroup*
parent, PortType port_type) {
+ if (!parent) {
+ logger_->log_error("parsePorts: no parent group was provided");
+ return;
+ }
+ if (!node || !node.IsSequence()) {
+ return;
+ }
+
+ for (const auto& element : node) {
+ const auto port_node = element.as<YAML::Node>();
+
+ std::string id = getOrGenerateId(port_node);
+
+ // Default name to be same as ID
+ const auto name = port_node["name"].as<std::string>(id);
+
+ const auto uuid = utils::Identifier::parse(id) | utils::orElse([this] {
+ logger_->log_debug("Incorrect port UUID format.");
+ throw Exception(ExceptionType::GENERAL_EXCEPTION, "Incorrect port UUID
format.");
+ });
+
+ auto port = std::make_unique<Port>(name, uuid.value(), port_type);
+ logger_->log_debug("Created port UUID %s and name %s", id, name);
+ port->setScheduledState(core::RUNNING);
+ port->setSchedulingStrategy(core::EVENT_DRIVEN);
+ parent->addPort(std::move(port));
+ }
+}
+
void YamlConfiguration::validateComponentProperties(ConfigurableComponent&
component, const std::string &component_name, const std::string &yaml_section)
const {
const auto &component_properties = component.getProperties();
diff --git a/libminifi/src/core/yaml/YamlConnectionParser.cpp
b/libminifi/src/core/yaml/YamlConnectionParser.cpp
index 5e246d8bc..9435be361 100644
--- a/libminifi/src/core/yaml/YamlConnectionParser.cpp
+++ b/libminifi/src/core/yaml/YamlConnectionParser.cpp
@@ -18,6 +18,7 @@
#include "core/yaml/YamlConnectionParser.h"
#include "core/yaml/CheckRequiredField.h"
+#include "Funnel.h"
namespace org::apache::nifi::minifi::core::yaml {
@@ -41,8 +42,8 @@ void
YamlConnectionParser::addFunnelRelationshipToConnection(minifi::Connection&
}
auto& processor_ref = *processor;
- if (typeid(minifi::core::Funnel) == typeid(processor_ref)) {
- addNewRelationshipToConnection(minifi::core::Funnel::Success.getName(),
connection);
+ if (typeid(minifi::Funnel) == typeid(processor_ref)) {
+ addNewRelationshipToConnection(minifi::Funnel::Success.getName(),
connection);
}
}
diff --git a/libminifi/test/resources/TestProcessGroup.yml
b/libminifi/test/resources/TestProcessGroup.yml
new file mode 100644
index 000000000..0b1ee75a3
--- /dev/null
+++ b/libminifi/test/resources/TestProcessGroup.yml
@@ -0,0 +1,69 @@
+MiNiFi Config Version: 3
+Flow Controller:
+ name: MiNiFi Flow
+Processors:
+- name: GenerateFlowFile
+ id: 4812b638-2f79-4dc2-9693-847a90399cbd
+ class: org.apache.nifi.minifi.processors.GenerateFlowFile
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 100 ms
+ penalization period: 1000 ms
+ Properties:
+ Batch Size: '1'
+ Data Format: Binary
+ File Size: 10 B
+ Unique FlowFiles: 'true'
+- name: Log attributes
+ id: 2ecd4bb4-b103-43fe-a45d-6a79b12da79b
+ class: org.apache.nifi.minifi.processors.LogAttribute
+ scheduling strategy: EVENT_DRIVEN
+ auto-terminated relationships list:
+ - success
+ Properties:
+ FlowFiles To Log: '0'
+Connections:
+- name: GenerateFlowFile/success/ProcessGroup
+ id: 492bc370-5d4c-4657-952f-3d6093147ad8
+ source id: 4812b638-2f79-4dc2-9693-847a90399cbd
+ source relationship names:
+ - success
+ destination id: 012fc536-3137-4360-be65-3e3b47e05941
+- name: ProcessGroup/success/LogAttribute
+ id: 12656e8e-0b91-4694-a2b7-3aa147574cd2
+ source id: 46dd8c65-8255-4980-8b7e-4381da00867a
+ source relationship names:
+ - success
+ destination id: 2ecd4bb4-b103-43fe-a45d-6a79b12da79b
+Controller Services: []
+Remote Process Groups: []
+Process Groups:
+ - id: 0a3aaf32-8574-4fa7-b720-84001f8dd71a
+ name: Update the attributes
+ Processors:
+ - id: 11624e01-baca-4590-bb9d-512ae2616615
+ name: UpdateAttribute
+ class: org.apache.nifi.minifi.processors.UpdateAttribute
+ scheduling strategy: EVENT_DRIVEN
+ auto-terminated relationships list:
+ - failure
+ Properties:
+ test_attribute: success
+ Input Ports:
+ - id: 012fc536-3137-4360-be65-3e3b47e05941
+ name: in
+ Output Ports:
+ - id: 46dd8c65-8255-4980-8b7e-4381da00867a
+ name: out
+ Connections:
+ - name: Input/success/UpdateAttribute
+ id: 2d33779c-2305-4e1a-88b8-1d2b6a9b134c
+ source id: 012fc536-3137-4360-be65-3e3b47e05941
+ source relationship names:
+ - success
+ destination id: 11624e01-baca-4590-bb9d-512ae2616615
+ - name: UpdateAttribute/success/Output
+ id: 5af95cc8-455a-4d5e-afbb-e16699407ed2
+ source id: 11624e01-baca-4590-bb9d-512ae2616615
+ source relationship names:
+ - success
+ destination id: 46dd8c65-8255-4980-8b7e-4381da00867a