Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 51cdbb773 -> e0d45609b
MINIFICPP-634: Add RPG and tests This closes #412. Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e0d45609 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e0d45609 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e0d45609 Branch: refs/heads/master Commit: e0d45609bff83f89decfeb8a8fc7700020f19110 Parents: 51cdbb7 Author: Marc Parisi <[email protected]> Authored: Thu Oct 4 16:03:56 2018 -0400 Committer: Aldrin Piri <[email protected]> Committed: Thu Oct 11 22:37:39 2018 -0400 ---------------------------------------------------------------------- libminifi/src/RemoteProcessorGroupPort.cpp | 21 +++++--- libminifi/test/unit/ProcessorTests.cpp | 69 +++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e0d45609/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index 8bed630..7270361 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -33,6 +33,7 @@ #include <utility> #include "sitetosite/Peer.h" +#include "Exception.h" #include "sitetosite/SiteToSiteFactory.h" #include "rapidjson/document.h" @@ -162,16 +163,19 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon * we must rely on the configured host/port */ if (peers_.empty() && is_http_disabled()) { - std::string host; - int configured_port; + std::string host, portStr; + int configured_port = -1; + // place hostname/port into the log message if we have it context->getProperty(hostName.getName(), host); - - int64_t lvalue; - if (context->getProperty(port.getName(), value) && !value.empty() && core::Property::StringToInt(value, lvalue)) { - configured_port = static_cast<int>(lvalue); + context->getProperty(port.getName(), portStr); + if (!host.empty() && !portStr.empty() && !portStr.empty() && core::Property::StringToInt(portStr, configured_port)) { + nifi_instances_.push_back({ host, configured_port, "" }); + bypass_rest_api_ = true; + } else { + // we cannot proceed, so log error and throw an exception + logger_->log_error("%s/%s/%d -- configuration values after eval of configuration options", host, portStr, configured_port); + throw(Exception(SITE2SITE_EXCEPTION, "HTTPClient not resolvable. No peers configured or any port specific hostname and port -- cannot schedule")); } - nifi_instances_.push_back({ host, configured_port, "" }); - bypass_rest_api_ = true; } // populate the site2site protocol for load balancing between them if (peers_.size() > 0) { @@ -194,6 +198,7 @@ void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessCon } } else { // we don't have any peers + logger_->log_error("No peers selected during scheduling"); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e0d45609/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index 32ba181..88f9b77 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -380,6 +380,75 @@ TEST_CASE("TestEmptyContent", "[emptyContent]") { LogTestController::getInstance().reset(); } +/** + * Tests the RPG bypass feature + * @param host to configure + * @param port port string to configure + * @param portVal port value to search in the corresponding log message + * @param hasException dictates if a failure should occur + */ +void testRPGBypass(const std::string &host, const std::string &port, const std::string &portVal = "-1", bool hasException = true) { + TestController testController; + LogTestController::getInstance().setTrace<minifi::RemoteProcessorGroupPort>(); + LogTestController::getInstance().setTrace<minifi::core::ProcessSession>(); + LogTestController::getInstance().setTrace<TestPlan>(); + + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + auto factory = minifi::io::StreamFactory::getInstance(configuration); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + auto rpg = std::make_shared<minifi::RemoteProcessorGroupPort>(factory, "rpg", "http://localhost:8989/nifi", configuration); + rpg->setProperty(minifi::RemoteProcessorGroupPort::hostName, host); + rpg->setProperty(minifi::RemoteProcessorGroupPort::port, port); + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(rpg); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + auto psf = std::make_shared<core::ProcessSessionFactory>(context); + if (hasException) { + auto expected_error = "Site2Site Protocol:HTTPClient not resolvable. No peers configured or any port specific hostname and port -- cannot schedule"; + try { + rpg->onSchedule(context, psf); + } catch (std::exception &e) { + REQUIRE(expected_error == std::string(e.what())); + } + std::stringstream search_expr; + search_expr << " " << host << "/" << port << "/" << portVal << " -- configuration values after eval of configuration options"; + REQUIRE(LogTestController::getInstance().contains(search_expr.str())); + } + LogTestController::getInstance().reset(); +} + +/** + * Since there is no curl loaded in this test folder, we will have is_http_disabled be true. + */ +TEST_CASE("TestRPGNoSettings", "[TestRPG1]") { + testRPGBypass("", ""); +} + +TEST_CASE("TestRPGWithHost", "[TestRPG2]") { + testRPGBypass("hostname", ""); +} + +TEST_CASE("TestRPGWithHostInvalidPort", "[TestRPG3]") { + testRPGBypass("hostname", "hostname"); +} + +TEST_CASE("TestRPGWithoutHostValidPort", "[TestRPG4]") { + testRPGBypass("", "8080"); +} + +TEST_CASE("TestRPGWithoutHostInvalidPort", "[TestRPG5]") { + testRPGBypass("", "hostname"); +} + +TEST_CASE("TestRPGValid", "[TestRPG6]") { + testRPGBypass("", "8080", "8080", false); +} + int fileSize(const char *add) { std::ifstream mySource; mySource.open(add, std::ios_base::binary);
