Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 3e23e20fe -> b8e45cbf9
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/HTTPIntegrationBase.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h new file mode 100644 index 0000000..611c11f --- /dev/null +++ b/extensions/http-curl/tests/HTTPIntegrationBase.h @@ -0,0 +1,75 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ +#define LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ + +#include "../tests/TestServer.h" +#include "CivetServer.h" +#include "integration/IntegrationBase.h" + +int log_message(const struct mg_connection *conn, const char *message) { + puts(message); + return 1; +} + +int ssl_enable(void *ssl_context, void *user_data) { + struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; + return 0; +} + +class HTTPIntegrationBase : public IntegrationBase { + public: + HTTPIntegrationBase() : IntegrationBase(), server(nullptr) {} + + void setUrl(std::string url, CivetHandler *handler); + + virtual ~HTTPIntegrationBase(); + + protected: + CivetServer *server; +}; + +HTTPIntegrationBase::~HTTPIntegrationBase() { + stop_webserver(server); +} + +void HTTPIntegrationBase::setUrl(std::string url, CivetHandler *handler) { + + parse_http_components(url, port, scheme, path); + struct mg_callbacks callback; + if (url.find("localhost") != std::string::npos) { + + if (server != nullptr){ + server->addHandler(path,handler); + return; + } + if (scheme == "https" && !key_dir.empty()) { + std::string cert = ""; + cert = key_dir + "nifi-cert.pem"; + memset(&callback, 0, sizeof(callback)); + callback.init_ssl = ssl_enable; + port += "s"; + callback.log_message = log_message; + server = start_webserver(port, path, handler, &callback, cert, cert); + } else { + server = start_webserver(port, path, handler); + } + } +} + +#endif /* LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp new file mode 100644 index 0000000..309492e --- /dev/null +++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp @@ -0,0 +1,262 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "CivetServer.h" +#include "sitetosite/HTTPProtocol.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "io/StreamFactory.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "TestServer.h" +#include "HTTPIntegrationBase.h" +#include "HTTPHandlers.h" +#include "client/HTTPStream.h" + +class SiteToSiteTestHarness : public HTTPIntegrationBase { + public: + explicit SiteToSiteTestHarness(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>(); + LogTestController::getInstance().setDebug<minifi::sitetosite::HttpSiteToSiteClient>(); + LogTestController::getInstance().setDebug<minifi::sitetosite::SiteToSiteClient>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>(); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ConfigurableComponent>(); + + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + + configuration->set("nifi.c2.enable", "false"); + configuration->set("nifi.remote.input.http.enabled", "true"); + configuration->set("nifi.remote.input.socket.port", "8082"); + } + + virtual void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +struct test_profile { + test_profile() + : flow_url_broken(false), + transaction_url_broken(false), + empty_transaction_url(false), + no_delete(false), + invalid_checksum(false) { + } + + bool allFalse() const { + return !flow_url_broken && !transaction_url_broken && !empty_transaction_url && !no_delete && !invalid_checksum; + } + // tests for a broken flow file url + bool flow_url_broken; + // transaction url will return incorrect information + bool transaction_url_broken; + // Location will be absent within the + bool empty_transaction_url; + // delete url is not supported. + bool no_delete; + // invalid checksum error + bool invalid_checksum; +}; + +void run_variance(std::string test_file_location, bool isSecure, std::string url, const struct test_profile &profile) { + SiteToSiteTestHarness harness(isSecure); + + SiteToSiteLocationResponder responder(isSecure); + + TransactionResponder transaction_response(url, "471deef6-2a6e-4a7d-912a-81cc17e3a204", true, profile.transaction_url_broken, profile.empty_transaction_url); + + std::string transaction_id = transaction_response.getTransactionId(); + + harness.setKeyDir(""); + + std::string controller_loc = url + "/controller"; + + harness.setUrl(controller_loc, &responder); + + std::string transaction_url = url + "/data-transfer/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions"; + std::string action_url = url + "/site-to-site/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions"; + + std::string transaction_output_url = url + "/data-transfer/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions"; + std::string action_output_url = url + "/site-to-site/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions"; + + harness.setUrl(transaction_url, &transaction_response); + + std::string peer_url = url + "/site-to-site/peers"; + + PeerResponder peer_response(url); + + harness.setUrl(peer_url, &peer_response); + + std::string flow_url = action_url + "/" + transaction_id + "/flow-files"; + + FlowFileResponder flowResponder(true, profile.flow_url_broken, profile.invalid_checksum); + flowResponder.setFlowUrl(flow_url); + auto producedFlows = flowResponder.getFlows(); + + TransactionResponder transaction_response_output(url, "471deef6-2a6e-4a7d-912a-81cc17e3a203", false, profile.transaction_url_broken, profile.empty_transaction_url); + std::string transaction_output_id = transaction_response_output.getTransactionId(); + transaction_response_output.setFeed(producedFlows); + + harness.setUrl(transaction_output_url, &transaction_response_output); + + std::string flow_output_url = action_output_url + "/" + transaction_output_id + "/flow-files"; + + FlowFileResponder flowOutputResponder(false, profile.flow_url_broken, profile.invalid_checksum); + flowOutputResponder.setFlowUrl(flow_output_url); + flowOutputResponder.setFeed(producedFlows); + + harness.setUrl(flow_url, &flowResponder); + harness.setUrl(flow_output_url, &flowOutputResponder); + + if (!profile.no_delete) { + std::string delete_url = transaction_url + "/" + transaction_id; + DeleteTransactionResponder deleteResponse(delete_url, "201 OK", 12); + harness.setUrl(delete_url, &deleteResponse); + + std::string delete_output_url = transaction_output_url + "/" + transaction_output_id; + DeleteTransactionResponder deleteOutputResponse(delete_output_url, "201 OK", producedFlows); + harness.setUrl(delete_output_url, &deleteOutputResponse); + } + + harness.run(test_file_location); + + std::stringstream assertStr; + if (profile.allFalse()) { + assertStr << "Site2Site transaction " << transaction_id << " peer finished transaction"; + assert(LogTestController::getInstance().contains(assertStr.str()) == true); + } else if (profile.empty_transaction_url) { + assert(LogTestController::getInstance().contains("Location is empty") == true); + } else if (profile.transaction_url_broken) { + assert(LogTestController::getInstance().contains("Could not create transaction, intent is ohstuff") == true); + } else if (profile.invalid_checksum) { + assertStr << "Site2Site transaction " << transaction_id << " peer confirm transaction with CRC Imawrongchecksumshortandstout"; + assert(LogTestController::getInstance().contains(assertStr.str()) == true); + assertStr.str(std::string()); + assertStr << "Site2Site transaction " << transaction_id << " CRC not matched"; + assert(LogTestController::getInstance().contains(assertStr.str()) == true); + assertStr.str(std::string()); + assertStr << "Site2Site delete transaction " << transaction_id; + assert(LogTestController::getInstance().contains(assertStr.str()) == true); + } else if (profile.no_delete) { + assert(LogTestController::getInstance().contains("Received 401 response code from delete") == true); + } else { + assertStr << "Site2Site transaction " << transaction_id << " peer unknown respond code 254"; + assert(LogTestController::getInstance().contains(assertStr.str()) == true); + } + LogTestController::getInstance().reset(); +} + +int main(int argc, char **argv) { + transaction_id = 0; + transaction_id_output = 0; + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + url = argv[3]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + { + struct test_profile profile; + run_variance(test_file_location, isSecure, url, profile); + } + + { + struct test_profile profile; + profile.flow_url_broken = true; + run_variance(test_file_location, isSecure, url, profile); + } + + { + struct test_profile profile; + profile.empty_transaction_url = true; + run_variance(test_file_location, isSecure, url, profile); + } + + { + struct test_profile profile; + profile.transaction_url_broken = true; + run_variance(test_file_location, isSecure, url, profile); + } + + { + struct test_profile profile; + profile.no_delete = true; + run_variance(test_file_location, isSecure, url, profile); + } + + { + struct test_profile profile; + profile.invalid_checksum = true; + run_variance(test_file_location, isSecure, url, profile); + } + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/HttpGetIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp new file mode 100644 index 0000000..df40497 --- /dev/null +++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp @@ -0,0 +1,162 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestServer.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "processors/InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +int log_message(const struct mg_connection *conn, const char *message) { + puts(message); + return 1; +} + +int ssl_enable(void *ssl_context, void *user_data) { + struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; + return 0; +} + +class HttpResponder : public CivetHandler { + public: + bool handleGet(CivetServer *server, struct mg_connection *conn) { + static const std::string site2site_rest_resp = "hi this is a get test"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + site2site_rest_resp.length()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } +}; + +int main(int argc, char **argv) { + init_webserver(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<core::ProcessSession>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::controllers::SSLContextService>(); + LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + std::string key_dir, test_file_location; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + configuration->set(minifi::Configure::nifi_default_directory, key_dir); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + content_repo->initialize(configuration); + + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), + content_repo, + DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); + std::shared_ptr<core::Processor> proc = ptr->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + ptr.release(); + HttpResponder h_ex; + std::string port, scheme, path; + CivetServer *server = nullptr; + + parse_http_components(url, port, scheme, path); + struct mg_callbacks callback; + if (url.find("localhost") != std::string::npos) { + if (scheme == "https") { + std::string cert = ""; + cert = key_dir + "nifi-cert.pem"; + memset(&callback, 0, sizeof(callback)); + callback.init_ssl = ssl_enable; + port +="s"; + callback.log_message = log_message; + server = start_webserver(port, path, &h_ex, &callback, cert, cert); + } else { + server = start_webserver(port, path, &h_ex); + } + } + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + if (url.find("localhost") == std::string::npos) { + stop_webserver(server); + exit(1); + } + std::string logs = LogTestController::getInstance().log_output.str(); + + assert(logs.find("key:filename value:") != std::string::npos); + assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos); + assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos); + + LogTestController::getInstance().reset(); + rmdir("./content_repository"); + stop_webserver(server); + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/HttpPostIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp new file mode 100644 index 0000000..7b5ca97 --- /dev/null +++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" +#include <sstream> +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "../tests/TestServer.h" +#include "HTTPIntegrationBase.h" + +class HttpTestHarness : public HTTPIntegrationBase { + public: + HttpTestHarness() { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ProcessGroup>(); + LogTestController::getInstance().setDebug<minifi::SchedulingAgent>(); + LogTestController::getInstance().setDebug<core::ProcessContext>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + configuration->set("nifi.flow.engine.threads", "8"); + configuration->set("nifi.c2.enable", "false"); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("curl performed") == true); + assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true); + assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false); + } + + protected: + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + HttpTestHarness harness; + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/SiteToSiteRestTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/SiteToSiteRestTest.cpp b/extensions/http-curl/tests/SiteToSiteRestTest.cpp new file mode 100644 index 0000000..2cf0955 --- /dev/null +++ b/extensions/http-curl/tests/SiteToSiteRestTest.cpp @@ -0,0 +1,145 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "../tests/TestServer.h" +#include "HTTPIntegrationBase.h" + +class Responder : public CivetHandler { + public: + explicit Responder(bool isSecure) + : isSecure(isSecure) { + } + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = "{" + "\"revision\": {" + "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" + "}," + "\"controller\": {" + "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," + "\"name\": \"NiFi Flow\"," + "\"remoteSiteListeningPort\": 10001," + "\"siteToSiteSecure\": "; + site2site_rest_resp += (isSecure ? "true" : "false"); + site2site_rest_resp += "}}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + site2site_rest_resp.length()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + protected: + bool isSecure; +}; + +class SiteToSiteTestHarness : public HTTPIntegrationBase { + public: + explicit SiteToSiteTestHarness(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>(); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ConfigurableComponent>(); + + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + if (isSecure) { + assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 1") == true); + } else { + assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 0") == true); + } + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + url = argv[3]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + SiteToSiteTestHarness harness(isSecure); + + Responder responder(isSecure); + + harness.setKeyDir(key_dir); + + harness.setUrl(url, &responder); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/TestServer.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/TestServer.h b/extensions/http-curl/tests/TestServer.h new file mode 100644 index 0000000..06f996c --- /dev/null +++ b/extensions/http-curl/tests/TestServer.h @@ -0,0 +1,117 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_TEST_TESTSERVER_H_ +#define LIBMINIFI_TEST_TESTSERVER_H_ +#include <regex.h> +#include <string> +#include <iostream> +#include "civetweb.h" +#include "CivetServer.h" + + +/* Server context handle */ +static std::string resp_str; + +void init_webserver() { + mg_init_library(0); +} + + +CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) { + const char *options[] = { "listening_ports", port.c_str(), "error_log_file", + "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", + "ALL", "ssl_verify_peer", "no", 0 }; + + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + CivetServer *server = new CivetServer(cpp_options); + + server->addHandler(rooturi, handler); + + return server; + +} + +CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler) { + const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), 0 }; + + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + CivetServer *server = new CivetServer(cpp_options); + + server->addHandler(rooturi, handler); + + return server; + +} + +bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) { + regex_t regex; + + const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$"; + + int ret = regcomp(®ex, regexstr, REG_EXTENDED); + if (ret) { + return false; + } + + size_t potentialGroups = regex.re_nsub + 1; + regmatch_t groups[potentialGroups]; + if (regexec(®ex, url.c_str(), potentialGroups, groups, 0) == 0) { + for (int i = 0; i < potentialGroups; i++) { + if (groups[i].rm_so == -1) + break; + + std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so); + switch (i) { + case 1: + scheme = str; + break; + case 3: + port = str; + break; + case 4: + path = str; + break; + default: + break; + } + } + } + if (path.empty() || scheme.empty() || port.empty()) + return false; + + regfree(®ex); + + return true; + +} + +static void stop_webserver(CivetServer *server) { + if (server != nullptr) + delete server; + + /* Un-initialize the library */ + mg_exit_library(); +} + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/ThreadPoolAdjust.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/ThreadPoolAdjust.cpp b/extensions/http-curl/tests/ThreadPoolAdjust.cpp new file mode 100644 index 0000000..13524d6 --- /dev/null +++ b/extensions/http-curl/tests/ThreadPoolAdjust.cpp @@ -0,0 +1,115 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "TestServer.h" +#include "HTTPIntegrationBase.h" +#include "processors/InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" + +class HttpTestHarness : public IntegrationBase { + public: + HttpTestHarness() { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ProcessGroup>(); + LogTestController::getInstance().setDebug<minifi::SchedulingAgent>(); + LogTestController::getInstance().setDebug<core::ProcessContext>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + configuration->set("nifi.flow.engine.threads", "1"); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("curl performed") == true); + assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true); + assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false); + } + + protected: + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + HttpTestHarness harness; + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b8e45cbf/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp new file mode 100644 index 0000000..81d2714 --- /dev/null +++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp @@ -0,0 +1,315 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <uuid/uuid.h> +#include <fstream> +#include <map> +#include <memory> +#include <utility> +#include <string> +#include <set> +#include "FlowController.h" +#include "io/BaseStream.h" +#include "TestBase.h" +#include "processors/GetFile.h" +#include "core/Core.h" +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "core/FlowFile.h" +#include "unit/ProvenanceTestHelper.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" +#include "processors/InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" + +TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { + TestController testController; + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + + std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); + + std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); + uuid_t processoruuid; + REQUIRE(true == listenhttp->getUUID(processoruuid)); + + uuid_t invokehttp_uuid; + REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); + + std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); + gcConnection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); + laConnection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); + connection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp"); + + connection2->setRelationship(core::Relationship("No Retry", "description")); + + // link the connections so that we can test results at the end for this + connection->setSource(listenhttp); + + connection2->setSourceUUID(invokehttp_uuid); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(invokehttp_uuid); + + listenhttp->addConnection(connection); + invokehttp->addConnection(connection); + invokehttp->addConnection(connection2); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp); + std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686"); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); + + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8686/testytesttest"); + auto session = std::make_shared<core::ProcessSession>(context); + auto session2 = std::make_shared<core::ProcessSession>(context2); + + REQUIRE(listenhttp->getName() == "listenhttp"); + + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); + + std::shared_ptr<core::FlowFile> record; + listenhttp->setScheduledState(core::ScheduledState::RUNNING); + listenhttp->onSchedule(context, factory); + listenhttp->onTrigger(context, session); + + invokehttp->incrementActiveTasks(); + invokehttp->setScheduledState(core::ScheduledState::RUNNING); + std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); + invokehttp->onSchedule(context2, factory2); + invokehttp->onTrigger(context2, session2); + + provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); + record = session->get(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + listenhttp->incrementActiveTasks(); + listenhttp->setScheduledState(core::ScheduledState::RUNNING); + listenhttp->onTrigger(context, session); + + reporter = session->getProvenanceReporter(); + + records = reporter->getEvents(); + session->commit(); + + invokehttp->incrementActiveTasks(); + invokehttp->setScheduledState(core::ScheduledState::RUNNING); + invokehttp->onTrigger(context2, session2); + + session2->commit(); + records = reporter->getEvents(); + + for (provenance::ProvenanceEventRecord *provEventRecord : records) { + REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); + } + std::shared_ptr<core::FlowFile> ffr = session2->get(); + REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); + LogTestController::getInstance().reset(); +} + +class CallBack : public minifi::OutputStreamCallback { + public: + CallBack() { + } + virtual ~CallBack() { + } + virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) { + // leaving the typo for posterity sake + std::string st = "we're gnna write some test stuff"; + return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), st.length()); + } +}; + +TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { + TestController testController; + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + + std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); + + std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); + uuid_t processoruuid; + REQUIRE(true == listenhttp->getUUID(processoruuid)); + + uuid_t invokehttp_uuid; + REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); + gcConnection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); + laConnection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); + connection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp"); + + connection2->setRelationship(core::Relationship("No Retry", "description")); + + // link the connections so that we can test results at the end for this + connection->setSource(listenhttp); + + connection->setSourceUUID(invokehttp_uuid); + connection->setDestinationUUID(processoruuid); + + connection2->setSourceUUID(processoruuid); + connection2->setSourceUUID(processoruuid); + + listenhttp->addConnection(connection); + invokehttp->addConnection(connection); + invokehttp->addConnection(connection2); + + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp); + std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680"); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); + + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8680/testytesttest"); + auto session = std::make_shared<core::ProcessSession>(context); + auto session2 = std::make_shared<core::ProcessSession>(context2); + + REQUIRE(listenhttp->getName() == "listenhttp"); + + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); + + std::shared_ptr<core::FlowFile> record; + + CallBack callback; + + std::map<std::string, std::string> attributes; + attributes["testy"] = "test"; + std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, content_repo, attributes); + session2->write(flow, &callback); + + invokehttp->incrementActiveTasks(); + invokehttp->setScheduledState(core::ScheduledState::RUNNING); + std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); + invokehttp->onSchedule(context2, factory2); + invokehttp->onTrigger(context2, session2); + + listenhttp->incrementActiveTasks(); + listenhttp->setScheduledState(core::ScheduledState::RUNNING); + listenhttp->onSchedule(context, factory); + listenhttp->onTrigger(context, session); + + provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); + record = session->get(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + listenhttp->incrementActiveTasks(); + listenhttp->setScheduledState(core::ScheduledState::RUNNING); + listenhttp->onTrigger(context, session); + + reporter = session->getProvenanceReporter(); + + records = reporter->getEvents(); + session->commit(); + + invokehttp->incrementActiveTasks(); + invokehttp->setScheduledState(core::ScheduledState::RUNNING); + invokehttp->onTrigger(context2, session2); + + session2->commit(); + records = reporter->getEvents(); + + for (provenance::ProvenanceEventRecord *provEventRecord : records) { + REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); + } + std::shared_ptr<core::FlowFile> ffr = session2->get(); + REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); + LogTestController::getInstance().reset(); +} + +TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { + TestController testController; + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::ListenHTTP>(); + LogTestController::getInstance().setInfo<core::Processor>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> processor = plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", "description"), false); + std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true); + + REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685")); + REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest")); + + REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST")); + REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), "http://localhost:8685/testytesttest")); + plan->reset(); + testController.runSession(plan, true); + + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + plan->reset(); + testController.runSession(plan, true); + + records = plan->getProvenanceRecords(); + record = plan->getCurrentFlowFile(); + + for (provenance::ProvenanceEventRecord *provEventRecord : records) { + REQUIRE(provEventRecord->getComponentType() == processor->getName()); + } + std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile(); + REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); + LogTestController::getInstance().reset(); +}
