Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 372f2d6f8 -> c9940e945
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/InvokeHTTPTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/InvokeHTTPTests.cpp b/libminifi/test/unit/InvokeHTTPTests.cpp index 6b96549..0200710 100644 --- a/libminifi/test/unit/InvokeHTTPTests.cpp +++ b/libminifi/test/unit/InvokeHTTPTests.cpp @@ -18,13 +18,13 @@ #include <uuid/uuid.h> #include <fstream> #include "FlowController.h" -#include "ProvenanceTestHelper.h" #include "../TestBase.h" #include "core/logging/LogAppenders.h" #include "core/logging/BaseLogger.h" #include "processors/GetFile.h" #include "core/Core.h" #include "../../include/core/FlowFile.h" +#include "../unit/ProvenanceTestHelper.h" #include "core/Processor.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -35,7 +35,8 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { std::stringstream oss; std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr< logging::BaseLogger>( - new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,0)); + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); logger->updateLogger(std::move(outputLogger)); @@ -43,9 +44,7 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { testController.enableDebug(); - - std::shared_ptr<TestRepository> repo = std::make_shared< - TestRepository>(); + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); std::shared_ptr<core::Processor> processor = std::make_shared< org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); @@ -58,26 +57,23 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { uuid_t invokehttp_uuid; REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); - std::shared_ptr<minifi::Connection> connection = std::make_shared< - minifi::Connection>(repo,"getfileCreate2Connection"); + minifi::Connection>(repo, "getfileCreate2Connection"); connection->setRelationship(core::Relationship("success", "description")); std::shared_ptr<minifi::Connection> connection2 = std::make_shared< - minifi::Connection>(repo,"listenhttp"); + minifi::Connection>(repo, "listenhttp"); connection2->setRelationship(core::Relationship("No Retry", "description")); // link the connections so that we can test results at the end for this connection->setSource(processor); - // link the connections so that we can test results at the end for this connection->setDestination(invokehttp); connection2->setSource(invokehttp); - connection2->setSourceUUID(invokehttp_uuid); connection->setSourceUUID(processoruuid); connection->setDestinationUUID(invokehttp_uuid); @@ -86,21 +82,23 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { invokehttp->addConnection(connection); invokehttp->addConnection(connection2); - core::ProcessorNode node(processor); core::ProcessorNode node2(invokehttp); - core::ProcessContext context(node, repo); - core::ProcessContext context2(node2, repo); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = + nullptr; + core::ProcessContext context(node, controller_services_provider, repo); + core::ProcessContext context2(node2, controller_services_provider, repo); context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8685"); - context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, - "/testytesttest"); + context.setProperty( + org::apache::nifi::minifi::processors::ListenHTTP::BasePath, + "/testytesttest"); - context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, - "POST"); + context2.setProperty( + org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, - "http://localhost:8685/testytesttest"); + "http://localhost:8685/testytesttest"); core::ProcessSession session(&context); core::ProcessSession session2(&context2); @@ -125,7 +123,6 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { REQUIRE(record == nullptr); REQUIRE(records.size() == 0); - processor->incrementActiveTasks(); processor->setScheduledState(core::ScheduledState::RUNNING); processor->onTrigger(&context, &session); @@ -142,25 +139,24 @@ TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { session2.commit(); records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { REQUIRE(provEventRecord->getComponentType() == processor->getName()); } std::shared_ptr<core::FlowFile> ffr = session2.get(); std::string log_attribute_output = oss.str(); -std::cout << log_attribute_output << std::endl; - REQUIRE( log_attribute_output.find("exiting because method is POST") != std::string::npos ); + REQUIRE( + log_attribute_output.find("exiting because method is POST") + != std::string::npos); } - TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { std::stringstream oss; std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr< logging::BaseLogger>( - new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,0)); + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); logger->updateLogger(std::move(outputLogger)); @@ -168,19 +164,16 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { testController.enableDebug(); - - - std::shared_ptr<TestRepository> repo = std::make_shared< - TestRepository>(); + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); std::shared_ptr<core::Processor> getfileprocessor = std::make_shared< - org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - std::shared_ptr<core::Processor> logAttribute = std::make_shared< - org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + std::shared_ptr<core::Processor> logAttribute = std::make_shared< + org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - char format[] = "/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); std::shared_ptr<core::Processor> listenhttp = std::make_shared< org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); @@ -193,30 +186,26 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { uuid_t invokehttp_uuid; REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); - std::shared_ptr<minifi::Connection> gcConnection = std::make_shared< - minifi::Connection>(repo, "getfileCreate2Connection"); + minifi::Connection>(repo, "getfileCreate2Connection"); gcConnection->setRelationship(core::Relationship("success", "description")); -std::shared_ptr<minifi::Connection> laConnection = std::make_shared< - minifi::Connection>(repo, "logattribute"); -laConnection->setRelationship(core::Relationship("success", "description")); - - + std::shared_ptr<minifi::Connection> laConnection = std::make_shared< + minifi::Connection>(repo, "logattribute"); + laConnection->setRelationship(core::Relationship("success", "description")); std::shared_ptr<minifi::Connection> connection = std::make_shared< - minifi::Connection>(repo,"getfileCreate2Connection"); + minifi::Connection>(repo, "getfileCreate2Connection"); connection->setRelationship(core::Relationship("success", "description")); std::shared_ptr<minifi::Connection> connection2 = std::make_shared< - minifi::Connection>(repo,"listenhttp"); + minifi::Connection>(repo, "listenhttp"); connection2->setRelationship(core::Relationship("No Retry", "description")); // link the connections so that we can test results at the end for this connection->setSource(listenhttp); - connection2->setSourceUUID(invokehttp_uuid); connection->setSourceUUID(processoruuid); connection->setDestinationUUID(invokehttp_uuid); @@ -225,21 +214,22 @@ laConnection->setRelationship(core::Relationship("success", "description")); invokehttp->addConnection(connection); invokehttp->addConnection(connection2); - core::ProcessorNode node(listenhttp); core::ProcessorNode node2(invokehttp); - - core::ProcessContext context(node, repo); - core::ProcessContext context2(node2, repo); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = + nullptr; + core::ProcessContext context(node, controller_services_provider, repo); + core::ProcessContext context2(node2, controller_services_provider, repo); context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686"); - context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, - "/testytesttest"); + context.setProperty( + org::apache::nifi::minifi::processors::ListenHTTP::BasePath, + "/testytesttest"); - context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, - "POST"); + context2.setProperty( + org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, - "http://localhost:8686/testytesttest"); + "http://localhost:8686/testytesttest"); core::ProcessSession session(&context); core::ProcessSession session2(&context2); @@ -264,7 +254,6 @@ laConnection->setRelationship(core::Relationship("success", "description")); REQUIRE(record == nullptr); REQUIRE(records.size() == 0); - listenhttp->incrementActiveTasks(); listenhttp->setScheduledState(core::ScheduledState::RUNNING); listenhttp->onTrigger(&context, &session); @@ -281,32 +270,28 @@ laConnection->setRelationship(core::Relationship("success", "description")); session2.commit(); records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); } std::shared_ptr<core::FlowFile> ffr = session2.get(); std::string log_attribute_output = oss.str(); -std::cout << log_attribute_output << std::endl; - REQUIRE( log_attribute_output.find("exiting because method is POST") != std::string::npos ); + REQUIRE( + log_attribute_output.find("exiting because method is POST") + != std::string::npos); } - -class CallBack : public minifi::OutputStreamCallback -{ +class CallBack : public minifi::OutputStreamCallback { public: - CallBack() - { + CallBack() { - } - virtual ~CallBack(){ + } + virtual ~CallBack() { } - virtual void process(std::ofstream *stream){ + virtual void process(std::ofstream *stream) { std::string st = "we're gnna write some test stuff"; - stream->write(st.c_str(),st.length()); + stream->write(st.c_str(), st.length()); } }; @@ -315,7 +300,8 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { std::stringstream oss; std::unique_ptr<logging::BaseLogger> outputLogger = std::unique_ptr< logging::BaseLogger>( - new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss,0)); + new org::apache::nifi::minifi::core::logging::OutputStreamAppender(oss, + 0)); std::shared_ptr<logging::Logger> logger = logging::Logger::getLogger(); logger->updateLogger(std::move(outputLogger)); @@ -323,19 +309,16 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { testController.enableDebug(); - - - std::shared_ptr<TestRepository> repo = std::make_shared< - TestRepository>(); + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); std::shared_ptr<core::Processor> getfileprocessor = std::make_shared< - org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - std::shared_ptr<core::Processor> logAttribute = std::make_shared< - org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + std::shared_ptr<core::Processor> logAttribute = std::make_shared< + org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - char format[] = "/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); std::shared_ptr<core::Processor> listenhttp = std::make_shared< org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); @@ -348,23 +331,20 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { uuid_t invokehttp_uuid; REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); - std::shared_ptr<minifi::Connection> gcConnection = std::make_shared< - minifi::Connection>(repo, "getfileCreate2Connection"); + minifi::Connection>(repo, "getfileCreate2Connection"); gcConnection->setRelationship(core::Relationship("success", "description")); -std::shared_ptr<minifi::Connection> laConnection = std::make_shared< - minifi::Connection>(repo, "logattribute"); -laConnection->setRelationship(core::Relationship("success", "description")); - - + std::shared_ptr<minifi::Connection> laConnection = std::make_shared< + minifi::Connection>(repo, "logattribute"); + laConnection->setRelationship(core::Relationship("success", "description")); std::shared_ptr<minifi::Connection> connection = std::make_shared< - minifi::Connection>(repo,"getfileCreate2Connection"); + minifi::Connection>(repo, "getfileCreate2Connection"); connection->setRelationship(core::Relationship("success", "description")); std::shared_ptr<minifi::Connection> connection2 = std::make_shared< - minifi::Connection>(repo,"listenhttp"); + minifi::Connection>(repo, "listenhttp"); connection2->setRelationship(core::Relationship("No Retry", "description")); @@ -372,31 +352,31 @@ laConnection->setRelationship(core::Relationship("success", "description")); connection->setSource(listenhttp); connection->setSourceUUID(invokehttp_uuid); - connection->setDestinationUUID(processoruuid); + connection->setDestinationUUID(processoruuid); connection2->setSourceUUID(processoruuid); connection2->setSourceUUID(processoruuid); - listenhttp->addConnection(connection); invokehttp->addConnection(connection); invokehttp->addConnection(connection2); - core::ProcessorNode node(invokehttp); core::ProcessorNode node2(listenhttp); - - core::ProcessContext context(node, repo); - core::ProcessContext context2(node2, repo); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = + nullptr; + core::ProcessContext context(node, controller_services_provider, repo); + core::ProcessContext context2(node2, controller_services_provider, repo); context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680"); - context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, - "/testytesttest"); + context.setProperty( + org::apache::nifi::minifi::processors::ListenHTTP::BasePath, + "/testytesttest"); - context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, - "POST"); + context2.setProperty( + org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, - "http://localhost:8680/testytesttest"); + "http://localhost:8680/testytesttest"); core::ProcessSession session(&context); core::ProcessSession session2(&context2); @@ -410,13 +390,14 @@ laConnection->setRelationship(core::Relationship("success", "description")); /* explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, - std::map<std::string, std::string> attributes, - std::shared_ptr<ResourceClaim> claim = nullptr); + std::map<std::string, std::string> attributes, + std::shared_ptr<ResourceClaim> claim = nullptr); */ - std::map<std::string,std::string> attributes; + std::map<std::string, std::string> attributes; attributes["testy"] = "test"; - std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo,attributes); - session2.write(flow,&callback); + std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared< + minifi::FlowFileRecord>(repo, attributes); + session2.write(flow, &callback); invokehttp->incrementActiveTasks(); invokehttp->setScheduledState(core::ScheduledState::RUNNING); @@ -429,15 +410,12 @@ laConnection->setRelationship(core::Relationship("success", "description")); listenhttp->onSchedule(&context, &factory); listenhttp->onTrigger(&context, &session); - - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); record = session.get(); REQUIRE(record == nullptr); REQUIRE(records.size() == 0); - listenhttp->incrementActiveTasks(); listenhttp->setScheduledState(core::ScheduledState::RUNNING); listenhttp->onTrigger(&context, &session); @@ -454,18 +432,14 @@ laConnection->setRelationship(core::Relationship("success", "description")); session2.commit(); records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); } std::shared_ptr<core::FlowFile> ffr = session2.get(); std::string log_attribute_output = oss.str(); -std::cout << log_attribute_output << std::endl; - REQUIRE( log_attribute_output.find("exiting because method is POST") != std::string::npos ); + REQUIRE( + log_attribute_output.find("exiting because method is POST") + != std::string::npos); } - - - http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/MockClasses.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/MockClasses.h b/libminifi/test/unit/MockClasses.h new file mode 100644 index 0000000..d32184b --- /dev/null +++ b/libminifi/test/unit/MockClasses.h @@ -0,0 +1,139 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_TEST_UNIT_MOCKCLASSES_H_ +#define LIBMINIFI_TEST_UNIT_MOCKCLASSES_H_ + +#include "core/controller/ControllerService.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +std::atomic<bool> disabled; + +class MockControllerService : public core::controller::ControllerService { + public: + explicit MockControllerService(const std::string &name, const std::string &id) + : ControllerService(name, id) { + + } + + explicit MockControllerService(const std::string &name, uuid_t uuid) + : ControllerService(name, uuid) { + + } + + explicit MockControllerService(const std::string &name) + : ControllerService(name, 0) { + + } + MockControllerService() { + + } + + ~MockControllerService() { + + } + + virtual void initialize() { + core::controller::ControllerService::initialize(); + enable(); + } + + std::string doSomething() { + return str; + } + + virtual void enable() { + str = "pushitrealgood"; + } + + void yield() { + + } + + bool isRunning() { + return true; + } + + bool isWorkAvailable() { + return true; + } + protected: + std::string str; +}; + +class MockProcessor : public core::Processor { + public: + + explicit MockProcessor(const std::string &name, uuid_t uuid) + : Processor(name, uuid) { + setTriggerWhenEmpty(true); + } + + explicit MockProcessor(const std::string &name) + : Processor(name, 0) { + setTriggerWhenEmpty(true); + } + + ~MockProcessor() { + + } + + virtual void initialize() { + core::Property property("linkedService", "Linked service"); + std::set<core::Property> properties; + properties.insert(property); + setSupportedProperties(properties); + + } + + // OnTrigger method, implemented by NiFi Processor Designer + virtual void onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { + + std::string linked_service = ""; + getProperty("linkedService", linked_service); + if (!IsNullOrEmpty(linked_service)) { + + std::shared_ptr<core::controller::ControllerService> service = context + ->getControllerService(linked_service); + + if (!disabled.load()) { + assert(true == context->isControllerServiceEnabled(linked_service)); + assert(nullptr != service); + assert( + "pushitrealgood" + == std::static_pointer_cast<MockControllerService>(service) + ->doSomething()); + } else { + assert(false == context->isControllerServiceEnabled(linked_service)); + } + + //verify we have access to the controller service + // and verify that we can execute it. + + } + } + + bool isYield() { + return false; + } + +}; + +#endif /* LIBMINIFI_TEST_UNIT_MOCKCLASSES_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 58ae870..67b5c65 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -93,7 +93,7 @@ class TestRepository : public core::Repository { class TestFlowRepository : public core::repository::FlowFileRepository { public: TestFlowRepository() - : core::repository::FlowFileRepository("./", 1000, 100, 0) { + : core::repository::FlowFileRepository("./dir", 1000, 100, 0) { } // initialize bool initialize() { @@ -154,9 +154,7 @@ class TestFlowRepository : public core::repository::FlowFileRepository { std::map<std::string, std::string> repositoryResults; }; - - -class TestFlowController : public minifi::FlowController { +class TestFlowController : public minifi::FlowController{ public: TestFlowController(std::shared_ptr<core::Repository> repo, http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/ProvenanceTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp index 2e41cc8..6a134ed 100644 --- a/libminifi/test/unit/ProvenanceTests.cpp +++ b/libminifi/test/unit/ProvenanceTests.cpp @@ -17,8 +17,8 @@ */ #include "../TestBase.h" +#include "../unit/ProvenanceTestHelper.h" -#include "ProvenanceTestHelper.h" #include "provenance/Provenance.h" #include "FlowFileRecord.h" #include "core/Core.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/RepoTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp index de51ead..c8deb89 100644 --- a/libminifi/test/unit/RepoTests.cpp +++ b/libminifi/test/unit/RepoTests.cpp @@ -17,8 +17,8 @@ */ #include "../TestBase.h" +#include "../unit/ProvenanceTestHelper.h" -#include "ProvenanceTestHelper.h" #include "provenance/Provenance.h" #include "FlowFileRecord.h" #include "core/Core.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/SerializationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/SerializationTests.cpp b/libminifi/test/unit/SerializationTests.cpp index 0841fbd..039ed57 100644 --- a/libminifi/test/unit/SerializationTests.cpp +++ b/libminifi/test/unit/SerializationTests.cpp @@ -22,11 +22,12 @@ #include <uuid/uuid.h> #include "core/logging/LogAppenders.h" #include "core/logging/BaseLogger.h" -#include "SiteToSiteHelper.h" #include <algorithm> #include <string> #include <memory> + #include "../TestBase.h" +#include "../unit/SiteToSiteHelper.h" #define FMT_DEFAULT fmt_lower using namespace org::apache::nifi::minifi::io; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/Site2SiteTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/Site2SiteTests.cpp b/libminifi/test/unit/Site2SiteTests.cpp index d82a3cf..5af86d5 100644 --- a/libminifi/test/unit/Site2SiteTests.cpp +++ b/libminifi/test/unit/Site2SiteTests.cpp @@ -22,11 +22,12 @@ #include <uuid/uuid.h> #include "core/logging/LogAppenders.h" #include "core/logging/BaseLogger.h" -#include "SiteToSiteHelper.h" #include <algorithm> #include <string> #include <memory> + #include "../TestBase.h" +#include "../unit/SiteToSiteHelper.h" #define FMT_DEFAULT fmt_lower using namespace org::apache::nifi::minifi::io; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/ThreadPoolTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ThreadPoolTests.cpp b/libminifi/test/unit/ThreadPoolTests.cpp new file mode 100644 index 0000000..5c85e19 --- /dev/null +++ b/libminifi/test/unit/ThreadPoolTests.cpp @@ -0,0 +1,38 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <future> +#include "../TestBase.h" +#include "utils/ThreadPool.h" + +bool function() { + return true; +} + +TEST_CASE("ThreadPoolTest1", "[TPT1]") { + utils::ThreadPool<bool> pool(5); + std::function<bool()> f_ex = function; + utils::Worker<bool> functor(f_ex); + pool.start(); + std::future<bool> fut = pool.execute(std::move(functor)); + + fut.wait(); + + REQUIRE(true == fut.get()); + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/YamlCongifurationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/YamlCongifurationTests.cpp b/libminifi/test/unit/YamlCongifurationTests.cpp index 3f804ab..0c229d0 100644 --- a/libminifi/test/unit/YamlCongifurationTests.cpp +++ b/libminifi/test/unit/YamlCongifurationTests.cpp @@ -127,7 +127,7 @@ TEST_CASE("Test YAML Config 1", "[testyamlconfig1]") { " timeout: 30 secs\n" " batch size: 1000"; - core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO, std::make_shared<minifi::io::StreamFactory>(std::make_shared<minifi::Configure>())); + core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO, std::make_shared<minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()),std::make_shared<minifi::Configure>()); std::istringstream yamlstream(TEST_YAML_WITHOUT_IDS); std::unique_ptr<core::ProcessGroup> rootFlowConfig = yamlConfig->getRoot(yamlstream); @@ -178,7 +178,7 @@ TEST_CASE("Test YAML Config Missing Required Fields", "[testyamlconfig2]") { " use compression: false\n" "\n"; - core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO, std::make_shared<minifi::io::StreamFactory>(std::make_shared<minifi::Configure>())); + core::YamlConfiguration *yamlConfig = new core::YamlConfiguration(TEST_PROV_REPO, TEST_FF_REPO, std::make_shared<minifi::io::StreamFactory>(std::make_shared<minifi::Configure>()),std::make_shared<minifi::Configure>()); std::istringstream yamlstream(TEST_YAML_NO_RPG_PORT_ID); REQUIRE_THROWS_AS(yamlConfig->getRoot(yamlstream), std::invalid_argument); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/resource/TestHTTPGet.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/resource/TestHTTPGet.yml b/libminifi/test/unit/resource/TestHTTPGet.yml deleted file mode 100644 index 0783b8e..0000000 --- a/libminifi/test/unit/resource/TestHTTPGet.yml +++ /dev/null @@ -1,73 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -Flow Controller: - name: MiNiFi Flow - id: 2438e3c8-015a-1000-79ca-83af40ec1990 -Processors: - - name: invoke - id: 2438e3c8-015a-1000-79ca-83af40ec1991 - class: org.apache.nifi.processors.standard.InvokeHTTP - max concurrent tasks: 1 - scheduling strategy: TIMER_DRIVEN - scheduling period: 1 sec - penalization period: 30 sec - yield period: 1 sec - run duration nanos: 0 - auto-terminated relationships list: - Properties: - HTTP Method: GET - Remote URL: https://curl.haxx.se/libcurl/c/httpput.html - - name: OhJeez - id: 2438e3c8-015a-1000-79ca-83af40ec1992 - class: org.apache.nifi.processors.standard.LogAttribute - max concurrent tasks: 1 - scheduling strategy: TIMER_DRIVEN - scheduling period: 1 sec - penalization period: 30 sec - yield period: 1 sec - run duration nanos: 0 - auto-terminated relationships list: response - Properties: - Log Level: info - Log Payload: true - -Connections: - - name: TransferFilesToRPG - id: 2438e3c8-015a-1000-79ca-83af40ec1997 - source name: invoke - source id: 2438e3c8-015a-1000-79ca-83af40ec1991 - source relationship name: success - destination name: OhJeez - destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 - max work queue size: 0 - max work queue data size: 1 MB - flowfile expiration: 60 sec - - name: TransferFilesToRPG2 - id: 2438e3c8-015a-1000-79ca-83af40ec1917 - source name: OhJeez - source id: 2438e3c8-015a-1000-79ca-83af40ec1992 - destination name: OhJeez - destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 - source relationship name: success - max work queue size: 0 - max work queue data size: 1 MB - flowfile expiration: 60 sec - -Remote Processing Groups: - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/libminifi/test/unit/resource/TestHTTPPost.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/resource/TestHTTPPost.yml b/libminifi/test/unit/resource/TestHTTPPost.yml deleted file mode 100644 index 837194d..0000000 --- a/libminifi/test/unit/resource/TestHTTPPost.yml +++ /dev/null @@ -1,87 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -Flow Controller: - name: MiNiFi Flow - id: 2438e3c8-015a-1000-79ca-83af40ec1990 -Processors: - - name: invoke - id: 2438e3c8-015a-1000-79ca-83af40ec1991 - class: org.apache.nifi.processors.standard.GetFile - max concurrent tasks: 1 - scheduling strategy: TIMER_DRIVEN - scheduling period: 1 sec - penalization period: 30 sec - yield period: 1 sec - run duration nanos: 0 - auto-terminated relationships list: - Properties: - Input Directory: /tmp/aljr39 - Keep Source File: false - - - name: OhJeez - id: 2438e3c8-015a-1000-79ca-83af40ec1992 - class: org.apache.nifi.processors.standard.InvokeHTTP - max concurrent tasks: 1 - scheduling strategy: TIMER_DRIVEN - scheduling period: 1 sec - penalization period: 30 sec - yield period: 1 sec - run duration nanos: 0 - auto-terminated relationships list: response - Properties: - HTTP Method: POST - Remote URL: http://requestb.in/u8ax9uu8 - - - name: Loggit - id: 2438e3c8-015a-1000-79ca-83af40ec1993 - class: org.apache.nifi.processors.standard.LogAttribute - max concurrent tasks: 1 - scheduling strategy: TIMER_DRIVEN - scheduling period: 1 sec - penalization period: 30 sec - yield period: 1 sec - run duration nanos: 0 - auto-terminated relationships list: response - Properties: - LogLevel: info - -Connections: - - name: TransferFilesToRPG - id: 2438e3c8-015a-1000-79ca-83af40ec1997 - source name: invoke - source id: 2438e3c8-015a-1000-79ca-83af40ec1991 - source relationship name: success - destination name: OhJeez - destination id: 2438e3c8-015a-1000-79ca-83af40ec1992 - max work queue size: 0 - max work queue data size: 1 MB - flowfile expiration: 60 sec - - name: TransferFilesToRPG2 - id: 2438e3c8-015a-1000-79ca-83af40ec1917 - source name: OhJeez - source id: 2438e3c8-015a-1000-79ca-83af40ec1992 - destination name: OhJeez - destination id: 2438e3c8-015a-1000-79ca-83af40ec1993 - source relationship name: success - max work queue size: 0 - max work queue data size: 1 MB - flowfile expiration: 60 sec - -Remote Processing Groups: - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/main/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt index a0e43b8..87506fa 100644 --- a/main/CMakeLists.txt +++ b/main/CMakeLists.txt @@ -23,7 +23,7 @@ IF(POLICY CMP0048) CMAKE_POLICY(SET CMP0048 OLD) ENDIF(POLICY CMP0048) -include_directories(../include ../libminifi/include ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/jsoncpp/include ../thirdparty/leveldb-1.18/include ../thirdparty/) +include_directories(../include ../libminifi/include ../libminifi/include/processors/ ../libminifi/include/core/yaml ../libminifi/include/core ../thirdparty/yaml-cpp-yaml-cpp-0.5.3/include ../thirdparty/civetweb-1.9.1/include ../thirdparty/leveldb-1.18/include ../thirdparty/) find_package(Boost REQUIRED) include_directories(${Boost_INCLUDE_DIRS}) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/c9940e94/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index daf4a8f..3eb16ae 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -60,7 +60,6 @@ // Variables that allow us to avoid a timed wait. sem_t *running; //! Flow Controller -static std::unique_ptr<minifi::FlowController> controller = nullptr; /** * Removed the stop command from the signal handler so that we could trigger @@ -178,10 +177,11 @@ int main(int argc, char **argv) { core::createFlowConfiguration(prov_repo, flow_repo, configure, stream_factory, nifi_configuration_class_name)); - controller = std::unique_ptr<minifi::FlowController>( + std::shared_ptr<minifi::FlowController> controller = std::unique_ptr<minifi::FlowController>( new minifi::FlowController(prov_repo, flow_repo, configure, std::move(flow_configuration))); + logger->log_info("Loading FlowController"); // Load flow from specified configuration file controller->load(); // Start Processing the flow
