http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/integration/HttpGetIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/HttpGetIntegrationTest.cpp b/libminifi/test/integration/HttpGetIntegrationTest.cpp index ae60dc1..a235759 100644 --- a/libminifi/test/integration/HttpGetIntegrationTest.cpp +++ b/libminifi/test/integration/HttpGetIntegrationTest.cpp @@ -26,6 +26,7 @@ #include <thread> #include <type_traits> #include <vector> +#include "../TestServer.h" #include "../TestBase.h" #include "utils/StringUtils.h" #include "core/Core.h" @@ -41,9 +42,23 @@ void waitToVerifyProcessor() { std::this_thread::sleep_for(std::chrono::seconds(10)); } +int log_message(const struct mg_connection *conn, const char *message) { + puts(message); + return 1; +} + +int ssl_enable(void *ssl_context, void *user_data) { + struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; + return 0; +} + int main(int argc, char **argv) { - LogTestController::getInstance().setInfo<minifi::processors::InvokeHTTP>(); - LogTestController::getInstance().setInfo<minifi::processors::LogAttribute>(); + init_webserver(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<core::ProcessSession>(); + LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); std::string key_dir, test_file_location; if (argc > 1) { test_file_location = argv[1]; @@ -59,27 +74,61 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + content_repo->initialize(configuration); + + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, - true); + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), + content_repo, + DEFAULT_ROOT_GROUP_NAME, + true); - core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location); + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); - ptr.release(); + std::shared_ptr<core::Processor> proc = ptr->findProcessor("invoke"); + assert(proc != nullptr); + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + ptr.release(); + std::string port, scheme, path; + parse_http_components(url, port, scheme, path); + struct mg_callbacks callback; + if (url.find("localhost") != std::string::npos) { + if (scheme == "https") { + std::string cert = ""; + cert = key_dir + "nifi-cert.pem"; + memset(&callback, 0, sizeof(callback)); + callback.init_ssl = ssl_enable; + callback.log_message = log_message; + std::cout << cert << std::endl; + start_webserver(port, path, "hi this is a get test", &callback, cert); + } else { + start_webserver(port, path, "hi this is a get test"); + } + } controller->load(); controller->start(); waitToVerifyProcessor(); controller->waitUnload(60000); + if (url.find("localhost") != std::string::npos) { + stop_webserver(); + } std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("key:filename value:") != std::string::npos); - assert(logs.find("key:invokehttp.request.url value:https://raw.githubusercontent.com/curl/curl/master/docs/examples/httpput.c") != std::string::npos); - assert(logs.find("Size:3734 Offset:0") != std::string::npos); + assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos); assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos); std::string stringtofind = "Resource Claim created ./content_repository/";
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/integration/HttpPostIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/HttpPostIntegrationTest.cpp b/libminifi/test/integration/HttpPostIntegrationTest.cpp index dfa284f..9a46574 100644 --- a/libminifi/test/integration/HttpPostIntegrationTest.cpp +++ b/libminifi/test/integration/HttpPostIntegrationTest.cpp @@ -28,6 +28,7 @@ #include <vector> #include "utils/StringUtils.h" #include "core/Core.h" +#include "../TestServer.h" #include "../include/core/logging/Logger.h" #include "core/ProcessGroup.h" #include "core/yaml/YamlConfiguration.h" @@ -42,6 +43,7 @@ void waitToVerifyProcessor() { } int main(int argc, char **argv) { + init_webserver(); LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); std::string test_file_location; @@ -51,7 +53,6 @@ int main(int argc, char **argv) { mkdir("/tmp/aljr39/", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); std::ofstream myfile; myfile.open("/tmp/aljr39/example.txt"); - myfile << "Hello world" << std::endl; myfile.close(); mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); @@ -62,31 +63,44 @@ int main(int argc, char **argv) { configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); - - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + content_repo->initialize(configuration); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), - DEFAULT_ROOT_GROUP_NAME, - true); + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + true); - core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location); + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); - ptr.release(); + std::shared_ptr<core::Processor> proc = ptr->findProcessor("OhJeez"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + ptr.release(); + std::string port, scheme, path; + parse_http_components(url, port, scheme, path); + start_webserver(port, path, "hi this is a post test"); controller->load(); controller->start(); waitToVerifyProcessor(); controller->waitUnload(60000); + std::string logs = LogTestController::getInstance().log_output.str(); + // stop webserver + stop_webserver(); assert(LogTestController::getInstance().contains("curl performed") == true); - assert(LogTestController::getInstance().contains("Import offset 0 length 12") == true); + assert(LogTestController::getInstance().contains("Import offset 0 length 22") == true); std::string stringtofind = "Resource Claim created ./content_repository/"; - std::string logs = LogTestController::getInstance().log_output.str(); size_t loc = logs.find(stringtofind); while (loc > 0 && loc != std::string::npos) { std::string id = logs.substr(loc + stringtofind.size(), 36); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/integration/ProvenanceReportingTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp index a7bcc2b..a6dc377 100644 --- a/libminifi/test/integration/ProvenanceReportingTest.cpp +++ b/libminifi/test/integration/ProvenanceReportingTest.cpp @@ -53,21 +53,20 @@ int main(int argc, char **argv) { LogTestController::getInstance().setDebug<core::ProcessGroup>(); std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); - - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), - DEFAULT_ROOT_GROUP_NAME, + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); - core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, configuration, test_file_location); + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/integration/Site2SiteRestTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/Site2SiteRestTest.cpp b/libminifi/test/integration/Site2SiteRestTest.cpp index 01aa7a8..1773cdb 100644 --- a/libminifi/test/integration/Site2SiteRestTest.cpp +++ b/libminifi/test/integration/Site2SiteRestTest.cpp @@ -45,22 +45,22 @@ void waitToVerifyProcessor() { std::this_thread::sleep_for(std::chrono::seconds(10)); } -class ConfigHandler: public CivetHandler { +class ConfigHandler : public CivetHandler { public: bool handleGet(CivetServer *server, struct mg_connection *conn) { static const std::string site2site_rest_resp = "{" - "\"revision\": {" - "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" - "}," - "\"controller\": {" - "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," - "\"name\": \"NiFi Flow\"," - "\"remoteSiteListeningPort\": 10001," - "\"siteToSiteSecure\": false" - "}}"; + "\"revision\": {" + "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" + "}," + "\"controller\": {" + "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," + "\"name\": \"NiFi Flow\"," + "\"remoteSiteListeningPort\": 10001," + "\"siteToSiteSecure\": false" + "}}"; mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - site2site_rest_resp.length()); + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + site2site_rest_resp.length()); mg_printf(conn, "%s", site2site_rest_resp.c_str()); return true; } @@ -71,7 +71,7 @@ int main(int argc, char **argv) { LogTestController::getInstance().setInfo<minifi::FlowController>(); const char *options[] = { "document_root", ".", "listening_ports", "8082", 0 }; - std::vector < std::string > cpp_options; + std::vector<std::string> cpp_options; for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { cpp_options.push_back(options[i]); } @@ -106,28 +106,31 @@ int main(int argc, char **argv) { TestFlowRepository>(); configuration->set(minifi::Configure::nifi_flow_configuration_file, - test_file_location); + test_file_location); std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared - < minifi::io::StreamFactory > (configuration); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr - < core::YamlConfiguration - > (new core::YamlConfiguration(test_repo, test_repo, stream_factory, - configuration, test_file_location)); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast - < TestRepository > (test_repo); + <minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - std::shared_ptr<minifi::FlowController> controller = - std::make_shared < minifi::FlowController - > (test_repo, test_flow_repo, configuration, std::move(yaml_ptr), DEFAULT_ROOT_GROUP_NAME, true); + content_repo->initialize(configuration); - core::YamlConfiguration yaml_config(test_repo, test_repo, stream_factory, - configuration, test_file_location); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), + content_repo, + DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location); std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( - test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr < core::ProcessGroup - > (ptr.get()); + test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup + >(ptr.get()); ptr.release(); controller->load(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/integration/TestExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/TestExecuteProcess.cpp b/libminifi/test/integration/TestExecuteProcess.cpp index ef0d113..5506c32 100644 --- a/libminifi/test/integration/TestExecuteProcess.cpp +++ b/libminifi/test/integration/TestExecuteProcess.cpp @@ -27,7 +27,7 @@ #include <memory> #include <vector> #include <fstream> - +#include "core/repository/VolatileContentRepository.h" #include "../unit/ProvenanceTestHelper.h" #include "FlowController.h" #include "processors/GetFile.h" @@ -47,15 +47,17 @@ int main(int argc, char **argv) { std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ExecuteProcess>("executeProcess"); processor->setMaxConcurrentTasks(1); - std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); - - std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - std::shared_ptr<minifi::FlowController> controller = std::make_shared<TestFlowController>(test_repo, test_repo); + std::shared_ptr<core::Repository> test_repo = + std::make_shared<TestRepository>(); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::shared_ptr<TestRepository> repo = + std::static_pointer_cast<TestRepository>(test_repo); + std::shared_ptr<minifi::FlowController> controller = std::make_shared< + TestFlowController>(test_repo, test_repo, content_repo); uuid_t processoruuid; assert(true == processor->getUUID(processoruuid)); - - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, "executeProcessConnection"); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "executeProcessConnection"); connection->setRelationship(core::Relationship("success", "description")); // link the connections so that we can test results at the end for this @@ -79,7 +81,7 @@ int main(int argc, char **argv) { core::ProcessorNode node2(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - std::shared_ptr<core::ProcessContext> contextset = std::make_shared<core::ProcessContext>(node2, controller_services_provider, test_repo); + std::shared_ptr<core::ProcessContext> contextset = std::make_shared<core::ProcessContext>(node2, controller_services_provider, test_repo, test_repo); core::ProcessSessionFactory factory(contextset.get()); processor->onSchedule(contextset.get(), &factory); @@ -87,7 +89,7 @@ int main(int argc, char **argv) { processor_workers.push_back(std::thread([processor, test_repo, &is_ready]() { core::ProcessorNode node(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, test_repo); + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, test_repo, test_repo); context->setProperty(org::apache::nifi::minifi::processors::ExecuteProcess::Command, "sleep 0.5"); std::shared_ptr<core::ProcessSession> session = std::make_shared<core::ProcessSession>(context.get()); while (!is_ready.load(std::memory_order_relaxed)) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/resources/TestHTTPGet.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestHTTPGet.yml b/libminifi/test/resources/TestHTTPGet.yml index 2f64f2a..58f95d9 100644 --- a/libminifi/test/resources/TestHTTPGet.yml +++ b/libminifi/test/resources/TestHTTPGet.yml @@ -32,7 +32,7 @@ Processors: auto-terminated relationships list: Properties: HTTP Method: GET - Remote URL: https://raw.githubusercontent.com/curl/curl/master/docs/examples/httpput.c + Remote URL: http://localhost:10003/geturl - name: OhJeez id: 2438e3c8-015a-1000-79ca-83af40ec1992 class: org.apache.nifi.processors.standard.LogAttribute http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/resources/TestHTTPGetSecure.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestHTTPGetSecure.yml b/libminifi/test/resources/TestHTTPGetSecure.yml index f3a23e5..9d19632 100644 --- a/libminifi/test/resources/TestHTTPGetSecure.yml +++ b/libminifi/test/resources/TestHTTPGetSecure.yml @@ -33,7 +33,7 @@ Processors: Properties: SSL Context Service: SSLContextService HTTP Method: GET - Remote URL: https://raw.githubusercontent.com/curl/curl/master/docs/examples/httpput.c + Remote URL: https://raw.githubusercontent.com/apache/nifi-minifi-cpp/master/docs/minifi-logo.png - name: OhJeez id: 2438e3c8-015a-1000-79ca-83af40ec1992 class: org.apache.nifi.processors.standard.LogAttribute http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/resources/TestHTTPPost.yml ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/TestHTTPPost.yml b/libminifi/test/resources/TestHTTPPost.yml index 837194d..c76069a 100644 --- a/libminifi/test/resources/TestHTTPPost.yml +++ b/libminifi/test/resources/TestHTTPPost.yml @@ -46,7 +46,7 @@ Processors: auto-terminated relationships list: response Properties: HTTP Method: POST - Remote URL: http://requestb.in/u8ax9uu8 + Remote URL: http://localhost:10003/urlofchampions - name: Loggit id: 2438e3c8-015a-1000-79ca-83af40ec1993 http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/resources/cn.ckey.pem ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/cn.ckey.pem b/libminifi/test/resources/cn.ckey.pem index 23017fa..fc42f06 100644 --- a/libminifi/test/resources/cn.ckey.pem +++ b/libminifi/test/resources/cn.ckey.pem @@ -1,5 +1,4 @@ Bag Attributes - friendlyName: nifi-key localKeyID: 73 E6 90 32 31 08 F5 87 C2 CE 8D 17 10 32 05 F2 95 6A 9E 9C Key Attributes: <No Attributes> -----BEGIN RSA PRIVATE KEY----- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/resources/cn.crt.pem ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/cn.crt.pem b/libminifi/test/resources/cn.crt.pem index 3a786db..60a38ac 100644 --- a/libminifi/test/resources/cn.crt.pem +++ b/libminifi/test/resources/cn.crt.pem @@ -1,5 +1,4 @@ Bag Attributes - friendlyName: nifi-key localKeyID: 73 E6 90 32 31 08 F5 87 C2 CE 8D 17 10 32 05 F2 95 6A 9E 9C subject=/OU=NIFI/CN=test issuer=/OU=NIFI/CN=localhost http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/resources/nifi-cert.pem ---------------------------------------------------------------------- diff --git a/libminifi/test/resources/nifi-cert.pem b/libminifi/test/resources/nifi-cert.pem index 4e404cd..0c3b7da 100644 --- a/libminifi/test/resources/nifi-cert.pem +++ b/libminifi/test/resources/nifi-cert.pem @@ -18,3 +18,30 @@ lvrRtWOqyGHiRoaRE5+VUjyO+0ToEgj9E+3rV8JL66BT7SWQusLGqbX1OoANCMTj BRYeqB0g0PrXU+6chh6StpNSnYzkQdoxLUIDYYZx2XGsbkjDh/k6ni6bgJEKEOCu T3Z2tyvGpc+PjLRXW/WyXCpg/xfr3+GSVKI6ark= -----END CERTIFICATE----- +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAwCF6Tchue7tR66BPg886WOYNPgSwNaq1KJQSuGcEHK2wlAEu +YfiYz9LbjFLZRLRY2CF9mIGb683byrnvOMcq6a+YdXDaOHZnkKBSsI/xTzScXTv3 +EKSueZ0sMuD7L0y/2Cs2lf8heBUEUqmNe15J9yvEQ1GpJ0j7iCCneKYjjezFWglR +Sv/9suvqVCxIxr4j9gXODgyU3wdwIxkQUBJXk4GtDp03Rxcx6Ch0VBwjcGkYHhcs +GHRzg6dcr795tLfOQNA/Vlje0+RtH/KU/WXgzl9nKtxD7XUwZyhoElzNcehN0WmK +DgAmASncvy7+YYzKU69H14Q+2n/apdoqx/kTQQIDAQABAoIBAQCz7eY69+y4BXo3 +nz84Ipby8CcQoJVg/QiBAwLxHNCWBvdp9B069PQvFLo1FNWSaQ8XAW48p4yc7YHb +vftRgfwnMyIlQdWrsP9WSz6FSZhkY9HX4rODK6aWD+J3l4jFCCxVxkpteKwgaBZP +T6hHE8tTJfK8VLqEJu4g0uvjqjt7ydJT69lThdyf3VE0v6ZeSjsya5qqw+9RK+uC +q5T/8FxeFZgpfR6UXXnoLAmAkfcMZNIBo6cOJWi/BQHjZdpCOVXUBtu0/lC8bffa +4/ESaxRS8kOp+WEb64pT7u6F7yhD/kve6ZnJj/SX1EvN+RzB3zoVG42WUs/+/SwN +dU1ERz+tAoGBAPbgZPDnWuKxW7Cam/Aqmvux624C1lNfhfXEGURhyc+wHWjjhWRe +2vEPJOVxG5pN/FAo+lFoGiLe3QsLRLPlQrGfT/92W28QEcRrRSutjRZOL3wKezQA +DkAPU9HX3lACR5yQD6+a0HHgMr1MqeNFPi9MPPjywGywTyWzHd4WQqvTAoGBAMc7 +J4fpr5uPVq9mKemK67i7meJ8AxjjU7oNe8EN+2XfCYcQUmgIo+dLzV9+DTrYkoTz +iqjA6Ph2DNs6YHI/JNwsdSbAz6KVDteimt3t+uyNpiMGuyLmfOgpYEMJcHp+q6I6 +7PGKVS4c5iPFiYuIo23Is9ZMxOVQp76+UOy09rwbAoGBAOM5Za7VQjGkTGAf7ab/ +j+ZZu/dlZR8XrJSoCRmHZ9hgoLEJuJzJMXruFWeY028SmEivbrW+u0+dEJY5qOJr +ARe7KkZXCZEPmUrP8Lpi4pjFHa9tdjhGVNdhRCTAKz442vCfJ9DZDUHCuPDCvxsP +gEzIPtZjl/hxzmdElRj0JClBAoGAaXmfzAyjs6+HLQThW4r4kKyBI66T1TFEulM5 +GVPVrHEQEjlJ51nrrCAtckjBqE3QBCMLXZwDusaEt+uH8/QKB6Zhv0qEooZXfUHQ +y32aQnIbap+9oxRzPFXraJIuwisdop2fo6Cgx/D0xitmTkDghNaknue1tdGlfQ40 +uZx0o9ECgYBeKeNbMnWoO46ZOrhaz8On+fIY7xtboV2bALy7lvUbWd9B41ntqYUm +NHlYXDDU+Izs5wnNJnNnx4vECuUzYbpeY82dvMewlQwfl5aiyKrjo7VxLm//2U/K +hlID6DU5wi9O+TAQ319DhxT7Ja+AQxO/OFS/mfrtwJEevxXqJLu55Q== +-----END RSA PRIVATE KEY----- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/unit/FileStreamTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp new file mode 100644 index 0000000..5c86f19 --- /dev/null +++ b/libminifi/test/unit/FileStreamTests.cpp @@ -0,0 +1,210 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define CATCH_CONFIG_MAIN // This tells Catch to provide a main() - only do this in one cpp file +#include "io/FileStream.h" +#include <string> +#include <vector> +#include <uuid/uuid.h> +#include "../TestBase.h" + +TEST_CASE("TestFileOverWrite", "[TestFiles]") { + TestController testController; + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::io::FileStream stream(path, 0, true); + std::vector<uint8_t> readBuffer; + REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize()); + + uint8_t* data = readBuffer.data(); + + REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile"); + + stream.seek(4); + + stream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("file")), 4); + + stream.seek(0); + + std::vector<uint8_t> verifybuffer; + + REQUIRE(stream.readData(verifybuffer, stream.getSize()) == stream.getSize()); + + data = verifybuffer.data(); + + REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempfile"); + + unlink(ss.str().c_str()); +} + +TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") { + TestController testController; + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::io::FileStream stream(path, 0, true); + std::vector<uint8_t> readBuffer; + REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize()); + + uint8_t* data = readBuffer.data(); + + REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile"); + + stream.seek(4); + + stream.write(reinterpret_cast<uint8_t*>(const_cast<char*>("file")), 0); + + stream.seek(0); + + std::vector<uint8_t> verifybuffer; + + REQUIRE(stream.readData(verifybuffer, stream.getSize()) == stream.getSize()); + + data = verifybuffer.data(); + + REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempFile"); + + unlink(ss.str().c_str()); +} + +TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") { + TestController testController; + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::io::FileStream stream(path, 0, true); + std::vector<uint8_t> readBuffer; + REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize()); + + uint8_t* data = readBuffer.data(); + + REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile"); + + stream.seek(4); + + stream.write(nullptr, 0); + + stream.seek(0); + + std::vector<uint8_t> verifybuffer; + + REQUIRE(stream.readData(verifybuffer, stream.getSize()) == stream.getSize()); + + data = verifybuffer.data(); + + REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempFile"); + + unlink(ss.str().c_str()); +} + +TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") { + TestController testController; + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::io::FileStream stream(path, 0, true); + std::vector<uint8_t> readBuffer; + REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize()); + + uint8_t* data = readBuffer.data(); + + REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile"); + + stream.seek(4); + + stream.write(nullptr, 0); + + stream.seek(0); + + std::vector<uint8_t> verifybuffer; + + REQUIRE(stream.readData(nullptr, stream.getSize()) == -1); + + data = verifybuffer.data(); + + REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == ""); + + unlink(ss.str().c_str()); +} + + +TEST_CASE("TestFileBeyondEnd3", "[TestLoader]") { + TestController testController; + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + std::string path = ss.str(); + file.open(path, std::ios::out); + file << "tempFile"; + file.close(); + + minifi::io::FileStream stream(path, 0, true); + std::vector<uint8_t> readBuffer; + REQUIRE(stream.readData(readBuffer, stream.getSize()) == stream.getSize()); + + uint8_t* data = readBuffer.data(); + + REQUIRE(std::string(reinterpret_cast<char*>(data), readBuffer.size()) == "tempFile"); + + stream.seek(0); + + std::vector<uint8_t> verifybuffer; + + REQUIRE(stream.readData(verifybuffer, 8192) == 8); + + data = verifybuffer.data(); + + REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "tempFile"); + + unlink(ss.str().c_str()); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/unit/InvokeHTTPTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/InvokeHTTPTests.cpp b/libminifi/test/unit/InvokeHTTPTests.cpp index 705ac84..2ef3c17 100644 --- a/libminifi/test/unit/InvokeHTTPTests.cpp +++ b/libminifi/test/unit/InvokeHTTPTests.cpp @@ -25,6 +25,7 @@ #include <string> #include <set> #include "FlowController.h" +#include "io/BaseStream.h" #include "../TestBase.h" #include "processors/GetFile.h" #include "core/Core.h" @@ -35,105 +36,9 @@ #include "core/ProcessSession.h" #include "core/ProcessorNode.h" -TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { - TestController testController; - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); - - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); - - std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); - uuid_t processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - - uuid_t invokehttp_uuid; - REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); - - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection"); - connection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "listenhttp"); - - connection2->setRelationship(core::Relationship("No Retry", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(processor); - - // link the connections so that we can test results at the end for this - connection->setDestination(invokehttp); - - connection2->setSource(invokehttp); - - connection2->setSourceUUID(invokehttp_uuid); - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(invokehttp_uuid); - - processor->addConnection(connection); - invokehttp->addConnection(connection); - invokehttp->addConnection(connection2); - - core::ProcessorNode node(processor); - core::ProcessorNode node2(invokehttp); - - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo); - core::ProcessContext context2(node2, controller_services_provider, repo); - context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8685"); - context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); - - context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); - context2.setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8685/testytesttest"); - core::ProcessSession session(&context); - core::ProcessSession session2(&context2); - - REQUIRE(processor->getName() == "listenhttp"); - - core::ProcessSessionFactory factory(&context); - - std::shared_ptr<core::FlowFile> record; - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onSchedule(&context, &factory); - processor->onTrigger(&context, &session); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - core::ProcessSessionFactory factory2(&context2); - invokehttp->onSchedule(&context2, &factory2); - invokehttp->onTrigger(&context2, &session2); - - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); - record = session.get(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onTrigger(&context, &session); - - reporter = session.getProvenanceReporter(); - - records = reporter->getEvents(); - session.commit(); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - invokehttp->onTrigger(&context2, &session2); - - session2.commit(); - records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == processor->getName()); - } - std::shared_ptr<core::FlowFile> ffr = session2.get(); - REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); - LogTestController::getInstance().reset(); -} - TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { TestController testController; + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); @@ -154,16 +59,16 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { uuid_t invokehttp_uuid; REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); - std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection"); + std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); gcConnection->setRelationship(core::Relationship("success", "description")); - std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, "logattribute"); + std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); laConnection->setRelationship(core::Relationship("success", "description")); - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection"); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); connection->setRelationship(core::Relationship("success", "description")); - std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "listenhttp"); + std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp"); connection2->setRelationship(core::Relationship("No Retry", "description")); @@ -181,8 +86,8 @@ TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { core::ProcessorNode node(listenhttp); core::ProcessorNode node2(invokehttp); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo); - core::ProcessContext context2(node2, controller_services_provider, repo); + core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); + core::ProcessContext context2(node2, controller_services_provider, repo, repo, content_repo); context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686"); context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); @@ -242,9 +147,10 @@ class CallBack : public minifi::OutputStreamCallback { } virtual ~CallBack() { } - virtual void process(std::ofstream *stream) { + virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) { + // leaving the typo for posterity sake std::string st = "we're gnna write some test stuff"; - stream->write(st.c_str(), st.length()); + return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), st.length()); } }; @@ -270,16 +176,18 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { uuid_t invokehttp_uuid; REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); - std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection"); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); gcConnection->setRelationship(core::Relationship("success", "description")); - std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, "logattribute"); + std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); laConnection->setRelationship(core::Relationship("success", "description")); - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection"); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); connection->setRelationship(core::Relationship("success", "description")); - std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "listenhttp"); + std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp"); connection2->setRelationship(core::Relationship("No Retry", "description")); @@ -299,8 +207,8 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { core::ProcessorNode node(invokehttp); core::ProcessorNode node2(listenhttp); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo); - core::ProcessContext context2(node2, controller_services_provider, repo); + core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); + core::ProcessContext context2(node2, controller_services_provider, repo, repo, content_repo); context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680"); context.setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); @@ -317,14 +225,9 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { CallBack callback; - /* - explicit FlowFileRecord(std::shared_ptr<core::Repository> flow_repository, - std::map<std::string, std::string> attributes, - std::shared_ptr<ResourceClaim> claim = nullptr); - */ std::map<std::string, std::string> attributes; attributes["testy"] = "test"; - std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, attributes); + std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, content_repo, attributes); session2.write(flow, &callback); invokehttp->incrementActiveTasks(); @@ -368,3 +271,39 @@ TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { LogTestController::getInstance().reset(); } +TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { + TestController testController; + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::ListenHTTP>(); + LogTestController::getInstance().setInfo<core::Processor>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> processor = plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", "description"), false); + std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true); + + REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685")); + REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest")); + + REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST")); + REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), "http://localhost:8685/testytesttest")); + plan->reset(); + testController.runSession(plan, true); + + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + plan->reset(); + testController.runSession(plan, true); + + records = plan->getProvenanceRecords(); + record = plan->getCurrentFlowFile(); + + for (provenance::ProvenanceEventRecord *provEventRecord : records) { + REQUIRE(provEventRecord->getComponentType() == processor->getName()); + } + std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile(); + REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); + LogTestController::getInstance().reset(); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index 9e2d50c..7f34ba4 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -23,11 +23,12 @@ #include <vector> #include <set> #include <fstream> -#include "../unit/ProvenanceTestHelper.h" + #include "../TestBase.h" #include "processors/ListenHTTP.h" #include "processors/LogAttribute.h" #include "processors/GetFile.h" +#include "../unit/ProvenanceTestHelper.h" #include "core/Core.h" #include "core/FlowFile.h" #include "core/Processor.h" @@ -42,131 +43,12 @@ TEST_CASE("Test Creation of GetFile", "[getfileCreate]") { REQUIRE(processor->getName() == "processorname"); } -TEST_CASE("Test Find file", "[getfileCreate2]") { - TestController testController; - - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - std::shared_ptr<org::apache::nifi::minifi::Configure> configure = std::make_shared<org::apache::nifi::minifi::Configure>(); - - std::shared_ptr<core::Processor> processorReport = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>( - std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(configure), configure); - - std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); - - std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - - char format[] = "/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); - - uuid_t processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, "getfileCreate2Connection"); - connection->setRelationship(core::Relationship("success", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(processor); - connection->setDestination(processor); - - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(processoruuid); - - processor->addConnection(connection); - REQUIRE(dir != NULL); - - core::ProcessorNode node(processor); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, test_repo); - core::ProcessSessionFactory factory(&context); - context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir); - core::ProcessSession session(&context); - - processor->onSchedule(&context, &factory); - REQUIRE(processor->getName() == "getfileCreate2"); - - std::shared_ptr<core::FlowFile> record; - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onTrigger(&context, &session); - - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); - record = session.get(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - std::fstream file; - std::stringstream ss; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onTrigger(&context, &session); - unlink(ss.str().c_str()); - reporter = session.getProvenanceReporter(); - - REQUIRE(processor->getName() == "getfileCreate2"); - - records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == processor->getName()); - } - session.commit(); - std::shared_ptr<core::FlowFile> ffr = session.get(); - - ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); - REQUIRE(2 == repo->getRepoMap().size()); - - for (auto entry : repo->getRepoMap()) { - provenance::ProvenanceEventRecord newRecord; - newRecord.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(entry.second.data())), entry.second.length()); - - bool found = false; - for (auto provRec : records) { - if (provRec->getEventId() == newRecord.getEventId()) { - REQUIRE(provRec->getEventId() == newRecord.getEventId()); - REQUIRE(provRec->getComponentId() == newRecord.getComponentId()); - REQUIRE(provRec->getComponentType() == newRecord.getComponentType()); - REQUIRE(provRec->getDetails() == newRecord.getDetails()); - REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration()); - found = true; - break; - } - } - if (!found) { - throw std::runtime_error("Did not find record"); - } - } - - core::ProcessorNode nodeReport(processorReport); - core::ProcessContext contextReport(nodeReport, controller_services_provider, test_repo); - core::ProcessSessionFactory factoryReport(&contextReport); - core::ProcessSession sessionReport(&contextReport); - processorReport->onSchedule(&contextReport, &factoryReport); - std::shared_ptr<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport = std::static_pointer_cast< - org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(processorReport); - taskReport->setBatchSize(1); - std::vector<std::shared_ptr<provenance::ProvenanceEventRecord>> recordsReport; - processorReport->incrementActiveTasks(); - processorReport->setScheduledState(core::ScheduledState::RUNNING); - std::string jsonStr; - repo->getProvenanceRecord(recordsReport, 1); - taskReport->getJsonReport(&contextReport, &sessionReport, recordsReport, jsonStr); - REQUIRE(recordsReport.size() == 1); - REQUIRE(taskReport->getName() == std::string(org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName)); - REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos); -} - TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { TestController testController; - + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); char format[] = "/tmp/gt.XXXXXX"; @@ -175,7 +57,8 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { uuid_t processoruuid; REQUIRE(true == processor->getUUID(processoruuid)); - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, "getfileCreate2Connection"); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(test_repo, content_repo, "getfileCreate2Connection"); + connection->setRelationship(core::Relationship("success", "description")); // link the connections so that we can test results at the end for this @@ -190,7 +73,7 @@ TEST_CASE("Test GetFileLikeIt'sThreaded", "[getfileCreate3]") { core::ProcessorNode node(processor); std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, test_repo); + core::ProcessContext context(node, controller_services_provider, test_repo, test_repo); core::ProcessSessionFactory factory(&context); context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir); // replicate 10 threads @@ -245,71 +128,59 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { TestController testController; LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); - std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> getfile = plan->addProcessor("GetFile", "getfileCreate2"); - std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true); char format[] = "/tmp/gt.XXXXXX"; char *dir = testController.createTempDirectory(format); - uuid_t processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - - uuid_t logattribute_uuid; - REQUIRE(true == logAttribute->getUUID(logattribute_uuid)); - - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "getfileCreate2Connection"); - connection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, "logattribute"); - connection2->setRelationship(core::Relationship("success", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(processor); - - // link the connections so that we can test results at the end for this - connection->setDestination(logAttribute); - - connection2->setSource(logAttribute); - - connection2->setSourceUUID(logattribute_uuid); - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(logattribute_uuid); + plan->setProperty(getfile, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir); + testController.runSession(plan, false); + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); - processor->addConnection(connection); - logAttribute->addConnection(connection); - logAttribute->addConnection(connection2); - REQUIRE(dir != NULL); + std::fstream file; + std::stringstream ss; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + plan->reset(); + testController.runSession(plan, false); - core::ProcessorNode node(processor); - core::ProcessorNode node2(logAttribute); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo); - core::ProcessContext context2(node2, controller_services_provider, repo); - context.setProperty(org::apache::nifi::minifi::processors::GetFile::Directory, dir); - core::ProcessSession session(&context); - core::ProcessSession session2(&context2); + unlink(ss.str().c_str()); - REQUIRE(processor->getName() == "getfileCreate2"); + records = plan->getProvenanceRecords(); + record = plan->getCurrentFlowFile(); + testController.runSession(plan, false); - std::shared_ptr<core::FlowFile> record; - processor->setScheduledState(core::ScheduledState::RUNNING); + records = plan->getProvenanceRecords(); + record = plan->getCurrentFlowFile(); - core::ProcessSessionFactory factory(&context); - processor->onSchedule(&context, &factory); - processor->onTrigger(&context, &session); + REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str())); + REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0")); + REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir))); + LogTestController::getInstance().reset(); +} - logAttribute->incrementActiveTasks(); - logAttribute->setScheduledState(core::ScheduledState::RUNNING); - core::ProcessSessionFactory factory2(&context2); - logAttribute->onSchedule(&context2, &factory2); - logAttribute->onTrigger(&context2, &session2); +TEST_CASE("Test Find file", "[getfileCreate3]") { + TestController testController; + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> processor = plan->addProcessor("GetFile", "getfileCreate2"); + std::shared_ptr<core::Processor> processorReport = std::make_shared<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>( + std::make_shared<org::apache::nifi::minifi::io::StreamFactory>(std::make_shared<org::apache::nifi::minifi::Configure>()), std::make_shared<org::apache::nifi::minifi::Configure>()); + plan->addProcessor(processorReport, "reporter", core::Relationship("success", "description"), false); + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); - record = session.get(); + plan->setProperty(processor, org::apache::nifi::minifi::processors::GetFile::Directory.getName(), dir); + testController.runSession(plan, false); + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); REQUIRE(record == nullptr); REQUIRE(records.size() == 0); @@ -319,26 +190,58 @@ TEST_CASE("LogAttributeTest", "[getfileCreate3]") { file.open(ss.str(), std::ios::out); file << "tempFile"; file.close(); + plan->reset(); + testController.runSession(plan, false); - processor->incrementActiveTasks(); - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onTrigger(&context, &session); - unlink(ss.str().c_str()); - reporter = session.getProvenanceReporter(); - - records = reporter->getEvents(); - session.commit(); - - logAttribute->incrementActiveTasks(); - logAttribute->setScheduledState(core::ScheduledState::RUNNING); - logAttribute->onTrigger(&context2, &session2); + records = plan->getProvenanceRecords(); + record = plan->getCurrentFlowFile(); + for (provenance::ProvenanceEventRecord *provEventRecord : records) { + REQUIRE(provEventRecord->getComponentType() == processor->getName()); + } + std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile(); + REQUIRE(ffr != nullptr); + ffr->getResourceClaim()->decreaseFlowFileRecordOwnedCount(); + auto repo = std::static_pointer_cast<TestRepository>(plan->getProvenanceRepo()); + REQUIRE(2 == repo->getRepoMap().size()); - records = reporter->getEvents(); + for (auto entry : repo->getRepoMap()) { + provenance::ProvenanceEventRecord newRecord; + newRecord.DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(entry.second.data())), entry.second.length()); - REQUIRE(true == LogTestController::getInstance().contains("key:absolute.path value:" + ss.str())); - REQUIRE(true == LogTestController::getInstance().contains("Size:8 Offset:0")); - REQUIRE(true == LogTestController::getInstance().contains("key:path value:" + std::string(dir))); - LogTestController::getInstance().reset(); + bool found = false; + for (auto provRec : records) { + if (provRec->getEventId() == newRecord.getEventId()) { + REQUIRE(provRec->getEventId() == newRecord.getEventId()); + REQUIRE(provRec->getComponentId() == newRecord.getComponentId()); + REQUIRE(provRec->getComponentType() == newRecord.getComponentType()); + REQUIRE(provRec->getDetails() == newRecord.getDetails()); + REQUIRE(provRec->getEventDuration() == newRecord.getEventDuration()); + found = true; + break; + } + } + if (!found) { + throw std::runtime_error("Did not find record"); + } + } + std::shared_ptr<org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask> taskReport = std::static_pointer_cast< + org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask>(processorReport); + taskReport->setBatchSize(1); + std::vector<std::shared_ptr<core::SerializableComponent>> recordsReport; + recordsReport.push_back(std::make_shared<provenance::ProvenanceEventRecord>()); + processorReport->incrementActiveTasks(); + processorReport->setScheduledState(core::ScheduledState::RUNNING); + std::string jsonStr; + std::size_t deserialized = 0; + repo->DeSerialize(recordsReport, deserialized); + std::function<void(core::ProcessContext*, core::ProcessSession*)> verifyReporter = [&](core::ProcessContext *context, core::ProcessSession *session) { + taskReport->getJsonReport(context, session, recordsReport, jsonStr); + REQUIRE(recordsReport.size() == 1); + REQUIRE(taskReport->getName() == std::string(org::apache::nifi::minifi::core::reporting::SiteToSiteProvenanceReportingTask::ReportTaskName)); + REQUIRE(jsonStr.find("\"componentType\" : \"getfileCreate2\"") != std::string::npos); + }; + + testController.runSession(plan, false, verifyReporter); } int fileSize(const char *add) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/unit/ProvenanceTestHelper.h ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index 17e6078..1b39700 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -18,11 +18,22 @@ #ifndef LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ #define LIBMINIFI_TEST_UNIT_PROVENANCETESTHELPER_H_ -#include "provenance/Provenance.h" -#include "FlowController.h" -#include "core/Repository.h" -#include "core/repository/FlowFileRepository.h" -#include "core/Core.h" +#include <atomic> +#include <cstdint> +#include <iostream> +#include <map> +#include <memory> +#include <string> +#include <utility> +#include <vector> +#include "core/repository/VolatileContentRepository.h" +#include "../../include/core/Processor.h" +#include "../../include/core/repository/FlowFileRepository.h" +#include "../../include/Connection.h" +#include "../../include/FlowController.h" +#include "../../include/properties/Configure.h" +#include "../../include/provenance/Provenance.h" + /** * Test repository */ @@ -41,17 +52,22 @@ class TestRepository : public core::Repository { } - bool Put(std::string key, uint8_t *buf, int bufLen) { + bool Put(std::string key, const uint8_t *buf, size_t bufLen) { repositoryResults.insert(std::pair<std::string, std::string>(key, std::string((const char*) buf, bufLen))); return true; } + + virtual bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) { + return Put(key, buffer, bufferSize); + } + // Delete bool Delete(std::string key) { repositoryResults.erase(key); return true; } // Get - bool Get(std::string key, std::string &value) { + bool Get(const std::string &key, std::string &value) { auto result = repositoryResults.find(key); if (result != repositoryResults.end()) { value = result->second; @@ -61,6 +77,39 @@ class TestRepository : public core::Repository { } } + virtual bool Serialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t max_size) { + return false; + } + + virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) { + max_size = 0; + for (auto entry : repositoryResults) { + std::shared_ptr<core::SerializableComponent> eventRead = store.at(max_size); + + if (eventRead->DeSerialize((uint8_t*) entry.second.data(), entry.second.length())) { + } + if (+max_size >= store.size()) { + break; + } + } + return true; + } + + virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> &store) { + return false; + } + + virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) { + std::string value; + Get(store->getUUIDStr(), value); + store->DeSerialize(reinterpret_cast<uint8_t*>(const_cast<char*>(value.c_str())), value.size()); + return true; + } + + virtual bool DeSerialize(const uint8_t *buffer, const size_t bufferSize) { + return false; + } + const std::map<std::string, std::string> &getRepoMap() const { return repositoryResults; } @@ -134,6 +183,9 @@ class TestFlowRepository : public core::repository::FlowFileRepository { } } } + + void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) { + } void run() { // do nothing @@ -145,8 +197,8 @@ class TestFlowRepository : public core::repository::FlowFileRepository { class TestFlowController : public minifi::FlowController { public: - TestFlowController(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo) - : minifi::FlowController(repo, flow_file_repo, std::make_shared<minifi::Configure>(), nullptr, "", true) { + TestFlowController(std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<core::ContentRepository> content_repo) + : minifi::FlowController(repo, flow_file_repo,std::make_shared<minifi::Configure>(), nullptr, std::make_shared<core::repository::VolatileContentRepository>(), "", true) { } ~TestFlowController() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/unit/ProvenanceTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProvenanceTests.cpp b/libminifi/test/unit/ProvenanceTests.cpp index 6a58597..97cb646 100644 --- a/libminifi/test/unit/ProvenanceTests.cpp +++ b/libminifi/test/unit/ProvenanceTests.cpp @@ -26,8 +26,8 @@ #include "provenance/Provenance.h" #include "FlowFileRecord.h" #include "core/Core.h" -#include "core/repository/FlowFileRepository.h" -#include "core/repository/VolatileRepository.h" +#include "core/repository/AtomicRepoEntries.h" +#include "core/repository/VolatileProvenanceRepository.h" TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") { provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", "blahblah"); @@ -49,7 +49,8 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv record1.Serialize(testRepository); provenance::ProvenanceEventRecord record2; - REQUIRE(record2.DeSerialize(testRepository, eventId) == true); + record2.setEventId(eventId); + REQUIRE(record2.DeSerialize(testRepository) == true); REQUIRE(record2.getEventId() == record1.getEventId()); REQUIRE(record2.getComponentId() == record1.getComponentId()); REQUIRE(record2.getComponentType() == record1.getComponentType()); @@ -60,12 +61,13 @@ TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEv TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype"); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); std::string eventId = record1.getEventId(); std::map<std::string, std::string> attributes; attributes.insert(std::pair<std::string, std::string>("potato", "potatoe")); attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe")); std::shared_ptr<core::repository::FlowFileRepository> frepo = std::make_shared<core::repository::FlowFileRepository>("ff", "./content_repository", 0, 0, 0); - std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, attributes); + std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, content_repo, attributes); record1.addChildFlowFile(ffr1); @@ -75,7 +77,8 @@ TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") { record1.Serialize(testRepository); provenance::ProvenanceEventRecord record2; - REQUIRE(record2.DeSerialize(testRepository, eventId) == true); + record2.setEventId(eventId); + REQUIRE(record2.DeSerialize(testRepository) == true); REQUIRE(record1.getChildrenUuids().size() == 1); REQUIRE(record2.getChildrenUuids().size() == 1); std::string childId = record2.getChildrenUuids().at(0); @@ -94,13 +97,14 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro uint64_t sample = 65555; - std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileRepository>(); + std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>(); testRepository->initialize(0); record1.setEventDuration(sample); record1.Serialize(testRepository); provenance::ProvenanceEventRecord record2; - REQUIRE(record2.DeSerialize(testRepository, eventId) == true); + record2.setEventId(eventId); + REQUIRE(record2.DeSerialize(testRepository) == true); REQUIRE(record2.getEventId() == record1.getEventId()); REQUIRE(record2.getComponentId() == record1.getComponentId()); REQUIRE(record2.getComponentType() == record1.getComponentType()); @@ -111,24 +115,26 @@ TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::Pro TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") { provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype"); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); std::string eventId = record1.getEventId(); std::map<std::string, std::string> attributes; attributes.insert(std::pair<std::string, std::string>("potato", "potatoe")); attributes.insert(std::pair<std::string, std::string>("tomato", "tomatoe")); - std::shared_ptr<core::Repository> frepo = std::make_shared<core::repository::VolatileRepository>(); + std::shared_ptr<core::Repository> frepo = std::make_shared<core::repository::VolatileProvenanceRepository>(); frepo->initialize(0); - std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, attributes); + std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>(frepo, content_repo, attributes); record1.addChildFlowFile(ffr1); uint64_t sample = 65555; - std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileRepository>(); + std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>(); testRepository->initialize(0); record1.setEventDuration(sample); record1.Serialize(testRepository); provenance::ProvenanceEventRecord record2; - REQUIRE(record2.DeSerialize(testRepository, eventId) == true); + record2.setEventId(eventId); + REQUIRE(record2.DeSerialize(testRepository) == true); REQUIRE(record1.getChildrenUuids().size() == 1); REQUIRE(record2.getChildrenUuids().size() == 1); std::string childId = record2.getChildrenUuids().at(0); @@ -151,7 +157,8 @@ TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::Provena testRepository->initialize(0); record1.setEventDuration(sample); - record1.Serialize(testRepository); + REQUIRE(record1.Serialize(testRepository) == true); provenance::ProvenanceEventRecord record2; - REQUIRE(record2.DeSerialize(testRepository, eventId) == false); + record2.setEventId(eventId); + REQUIRE(record2.DeSerialize(testRepository) == false); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/unit/RepoTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/RepoTests.cpp b/libminifi/test/unit/RepoTests.cpp index 4424a93..3b18310 100644 --- a/libminifi/test/unit/RepoTests.cpp +++ b/libminifi/test/unit/RepoTests.cpp @@ -23,7 +23,7 @@ #include "provenance/Provenance.h" #include "FlowFileRecord.h" #include "core/Core.h" -#include "core/repository/FlowFileRepository.h" +#include "../../include/core/repository/AtomicRepoEntries.h" #include "properties/Configure.h" TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { @@ -34,7 +34,8 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") { repository->initialize(std::make_shared<minifi::Configure>()); - minifi::FlowFileRecord record(repository); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + minifi::FlowFileRecord record(repository, content_repo); record.addAttribute("keyA", ""); @@ -50,8 +51,8 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") { std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir, 0, 0, 1); repository->initialize(std::make_shared<minifi::Configure>()); - - minifi::FlowFileRecord record(repository); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + minifi::FlowFileRecord record(repository, content_repo); record.addAttribute("keyA", "hasdgasdgjsdgasgdsgsadaskgasd"); @@ -70,9 +71,10 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") { repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); - minifi::FlowFileRecord record(repository); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + minifi::FlowFileRecord record(repository, content_repo); - minifi::FlowFileRecord record2(repository); + minifi::FlowFileRecord record2(repository, content_repo); std::string uuid = record.getUUIDStr(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fe634853/libminifi/test/unit/TailFileTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/TailFileTests.cpp b/libminifi/test/unit/TailFileTests.cpp index e800b4c..eb33f8c 100644 --- a/libminifi/test/unit/TailFileTests.cpp +++ b/libminifi/test/unit/TailFileTests.cpp @@ -42,130 +42,137 @@ static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt"; static const char *STATE_FILE = "/tmp/minifi-state-file.txt"; TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") { - try { - // Create and write to the test file - std::ofstream tmpfile; - tmpfile.open(TMP_FILE); - tmpfile << NEWLINE_FILE; - tmpfile.close(); + try { + // Create and write to the test file + std::ofstream tmpfile; + tmpfile.open(TMP_FILE); + tmpfile << NEWLINE_FILE; + tmpfile.close(); - TestController testController; - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>(); + TestController testController; + LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::TailFile>(); + LogTestController::getInstance().setDebug<core::ProcessSession>(); + LogTestController::getInstance().setDebug<core::repository::VolatileContentRepository>(); - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile"); - std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - uuid_t processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - uuid_t logAttributeuuid; - REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection"); - connection->setRelationship(core::Relationship("success", "TailFile successful output")); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "TailFile successful output")); - // link the connections so that we can test results at the end for this - connection->setDestination(connection); + // link the connections so that we can test results at the end for this + connection->setDestination(connection); - connection->setSourceUUID(processoruuid); + connection->setSourceUUID(processoruuid); - processor->addConnection(connection); + processor->addConnection(connection); - core::ProcessorNode node(processor); + core::ProcessorNode node(processor); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo); - context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); - context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE); - context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); + context.setProperty(org::apache::nifi::minifi::processors::TailFile::Delimiter, "\n"); + context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE); + context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE); - core::ProcessSession session(&context); + core::ProcessSession session(&context); - REQUIRE(processor->getName() == "tailfile"); + REQUIRE(processor->getName() == "tailfile"); - core::ProcessSessionFactory factory(&context); + core::ProcessSessionFactory factory(&context); - std::shared_ptr<core::FlowFile> record; - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onSchedule(&context, &factory); - processor->onTrigger(&context, &session); + std::shared_ptr<core::FlowFile> record; + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onSchedule(&context, &factory); + processor->onTrigger(&context, &session); - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents(); - record = session.get(); - REQUIRE(record == nullptr); - std::shared_ptr<core::FlowFile> ff = session.get(); - REQUIRE(provRecords.size() == 4); // 2 creates and 2 modifies for flowfiles + provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents(); + record = session.get(); + REQUIRE(record == nullptr); + std::shared_ptr<core::FlowFile> ff = session.get(); + REQUIRE(provRecords.size() == 4); // 2 creates and 2 modifies for flowfiles - LogTestController::getInstance().reset(); - } catch (...) { } + LogTestController::getInstance().reset(); + } catch (...) { + } - // Delete the test and state file. - std::remove(TMP_FILE); - std::remove(STATE_FILE); + // Delete the test and state file. + std::remove(TMP_FILE); + std::remove(STATE_FILE); } - TEST_CASE("TailFileWithoutDelimiter", "[tailfiletest2]") { - try { - // Create and write to the test file - std::ofstream tmpfile; - tmpfile.open(TMP_FILE); - tmpfile << NEWLINE_FILE; - tmpfile.close(); + try { + // Create and write to the test file + std::ofstream tmpfile; + tmpfile.open(TMP_FILE); + tmpfile << NEWLINE_FILE; + tmpfile.close(); - TestController testController; - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>(); + TestController testController; + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::TailFile>(); - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile"); - std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + std::shared_ptr<core::Processor> processor = std::make_shared<org::apache::nifi::minifi::processors::TailFile>("tailfile"); + std::shared_ptr<core::Processor> logAttributeProcessor = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - uuid_t processoruuid; - REQUIRE(true == processor->getUUID(processoruuid)); - uuid_t logAttributeuuid; - REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); + uuid_t processoruuid; + REQUIRE(true == processor->getUUID(processoruuid)); + uuid_t logAttributeuuid; + REQUIRE(true == logAttributeProcessor->getUUID(logAttributeuuid)); - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, "logattributeconnection"); - connection->setRelationship(core::Relationship("success", "TailFile successful output")); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + content_repo->initialize(std::make_shared<org::apache::nifi::minifi::Configure>()); + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "logattributeconnection"); + connection->setRelationship(core::Relationship("success", "TailFile successful output")); - // link the connections so that we can test results at the end for this - connection->setDestination(connection); - connection->setSourceUUID(processoruuid); + // link the connections so that we can test results at the end for this + connection->setDestination(connection); + connection->setSourceUUID(processoruuid); - processor->addConnection(connection); + processor->addConnection(connection); - core::ProcessorNode node(processor); + core::ProcessorNode node(processor); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - core::ProcessContext context(node, controller_services_provider, repo); - context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE); - context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + core::ProcessContext context(node, controller_services_provider, repo, repo, content_repo); + context.setProperty(org::apache::nifi::minifi::processors::TailFile::FileName, TMP_FILE); + context.setProperty(org::apache::nifi::minifi::processors::TailFile::StateFile, STATE_FILE); - core::ProcessSession session(&context); + core::ProcessSession session(&context); - REQUIRE(processor->getName() == "tailfile"); + REQUIRE(processor->getName() == "tailfile"); - core::ProcessSessionFactory factory(&context); + core::ProcessSessionFactory factory(&context); - std::shared_ptr<core::FlowFile> record; - processor->setScheduledState(core::ScheduledState::RUNNING); - processor->onSchedule(&context, &factory); - processor->onTrigger(&context, &session); + std::shared_ptr<core::FlowFile> record; + processor->setScheduledState(core::ScheduledState::RUNNING); + processor->onSchedule(&context, &factory); + processor->onTrigger(&context, &session); - provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); - std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents(); - record = session.get(); - REQUIRE(record == nullptr); - std::shared_ptr<core::FlowFile> ff = session.get(); - REQUIRE(provRecords.size() == 2); + provenance::ProvenanceReporter *reporter = session.getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> provRecords = reporter->getEvents(); + record = session.get(); + REQUIRE(record == nullptr); + std::shared_ptr<core::FlowFile> ff = session.get(); + REQUIRE(provRecords.size() == 2); - LogTestController::getInstance().reset(); - } catch (...) { } + LogTestController::getInstance().reset(); + } catch (...) { + } - // Delete the test and state file. - std::remove(TMP_FILE); - std::remove(STATE_FILE); + // Delete the test and state file. + std::remove(TMP_FILE); + std::remove(STATE_FILE); }
