http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/CMakeLists.txt b/libminifi/test/curl-tests/CMakeLists.txt deleted file mode 100644 index b645da1..0000000 --- a/libminifi/test/curl-tests/CMakeLists.txt +++ /dev/null @@ -1,76 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -file(GLOB CURL_UNIT_TESTS "unit/*.cpp") -file(GLOB CURL_INTEGRATION_TESTS "*.cpp") - -SET(CURL_INT_TEST_COUNT 0) - -FOREACH(testfile ${CURL_UNIT_TESTS}) - get_filename_component(testfilename "${testfile}" NAME_WE) - add_executable("${testfilename}" "${testfile}") - target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS}) - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/client/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/processors/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/protocols/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/sitetosite/") - target_link_libraries(${testfilename} ${CURL_LIBRARIES} ) - createTests("${testfilename}") - target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) - if (APPLE) - target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl ) - else () - target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive) - endif() - MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") -ENDFOREACH() - -FOREACH(testfile ${CURL_INTEGRATION_TESTS}) - get_filename_component(testfilename "${testfile}" NAME_WE) - add_executable("${testfilename}" "${testfile}") - target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS}) - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/client/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/processors/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/protocols/") - target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/sitetosite/") - target_link_libraries(${testfilename} ${CURL_LIBRARIES} ) - createTests("${testfilename}") - if (APPLE) - target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl ) - else () - target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive) - endif() - MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") -ENDFOREACH() - -message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test file(s)...") - -add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") -add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") -add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/") -add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/") -add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/") -add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") -add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/") -add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/") -add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site") -add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/") -add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/")
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp b/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp deleted file mode 100644 index d34d65e..0000000 --- a/libminifi/test/curl-tests/ControllerServiceIntegrationTests.cpp +++ /dev/null @@ -1,160 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#undef NDEBUG -#include <cassert> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <utility> -#include <thread> -#include <type_traits> -#include <vector> - -#include "../include/core/controller/ControllerServiceMap.h" -#include "../include/core/controller/StandardControllerServiceNode.h" -#include "../include/core/controller/StandardControllerServiceProvider.h" -#include "controllers/SSLContextService.h" -#include "../include/core/Core.h" -#include "../include/core/logging/LoggerConfiguration.h" -#include "../include/core/ProcessGroup.h" -#include "../include/core/Resource.h" -#include "../include/core/yaml/YamlConfiguration.h" -#include "../include/FlowController.h" -#include "../include/properties/Configure.h" -#include "../unit/MockClasses.h" -#include "../unit/ProvenanceTestHelper.h" - -REGISTER_RESOURCE(MockControllerService); -REGISTER_RESOURCE(MockProcessor); - -std::shared_ptr<core::controller::StandardControllerServiceNode> newCsNode(std::shared_ptr<core::controller::ControllerServiceProvider> provider, const std::string id) { - std::shared_ptr<core::controller::ControllerService> service = std::make_shared<MockControllerService>(); - std::shared_ptr<core::controller::StandardControllerServiceNode> testNode = std::make_shared<core::controller::StandardControllerServiceNode>(service, provider, id, - std::make_shared<minifi::Configure>()); - return testNode; -} - -void waitToVerifyProcessor() { - std::this_thread::sleep_for(std::chrono::seconds(2)); -} - -int main(int argc, char **argv) { - std::string test_file_location; - std::string key_dir; - - if (argc > 2) { - test_file_location = argv[1]; - key_dir = argv[1]; - } - - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - - std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); - std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); - - configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - std::string client_cert = "cn.crt.pem"; - std::string priv_key_file = "cn.ckey.pem"; - std::string passphrase = "cn.pass"; - std::string ca_cert = "nifi-cert.pem"; - configuration->set(minifi::Configure::nifi_security_client_certificate, test_file_location); - configuration->set(minifi::Configure::nifi_security_client_private_key, priv_key_file); - configuration->set(minifi::Configure::nifi_security_client_pass_phrase, passphrase); - configuration->set(minifi::Configure::nifi_default_directory, key_dir); - - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - content_repo->initialize(configuration); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), - content_repo, - DEFAULT_ROOT_GROUP_NAME, - true); - - disabled = false; - std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>(); - - core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); - - std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); - ptr.release(); - - std::shared_ptr<core::controller::StandardControllerServiceProvider> provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, pg, std::make_shared<minifi::Configure>()); - std::shared_ptr<core::controller::ControllerServiceNode> mockNode = pg->findControllerService("MockItLikeIts1995"); - assert(mockNode != nullptr); - mockNode->enable(); - std::vector<std::shared_ptr<core::controller::ControllerServiceNode> > linkedNodes = mockNode->getLinkedControllerServices(); - assert(linkedNodes.size() == 1); - - std::shared_ptr<core::controller::ControllerServiceNode> notexistNode = pg->findControllerService("MockItLikeItsWrong"); - assert(notexistNode == nullptr); - - std::shared_ptr<core::controller::ControllerServiceNode> ssl_client_cont = nullptr; - std::shared_ptr<minifi::controllers::SSLContextService> ssl_client = nullptr; - { - std::lock_guard<std::mutex> lock(control_mutex); - controller->load(); - controller->start(); - ssl_client_cont = controller->getControllerServiceNode("SSLClientServiceTest"); - ssl_client_cont->enable(); - assert(ssl_client_cont != nullptr); - assert(ssl_client_cont->getControllerServiceImplementation() != nullptr); - ssl_client = std::static_pointer_cast<minifi::controllers::SSLContextService>(ssl_client_cont->getControllerServiceImplementation()); - } - assert(ssl_client->getCACertificate().length() > 0); - // now let's disable one of the controller services. - std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID"); - assert(cs_id != nullptr); - { - std::lock_guard<std::mutex> lock(control_mutex); - controller->disableControllerService(cs_id); - disabled = true; - waitToVerifyProcessor(); - } - { - std::lock_guard<std::mutex> lock(control_mutex); - controller->enableControllerService(cs_id); - disabled = false; - waitToVerifyProcessor(); - } - std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995"); - assert(cs_id->enabled()); -{ - std::lock_guard<std::mutex> lock(control_mutex); - controller->disableReferencingServices(mock_cont); - disabled = true; - waitToVerifyProcessor(); - } - assert(cs_id->enabled() == false); -{ - std::lock_guard<std::mutex> lock(control_mutex); - controller->enableReferencingServices(mock_cont); - disabled = false; - waitToVerifyProcessor(); - } - assert(cs_id->enabled() == true); - - controller->waitUnload(60000); - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/GetFileNoData.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/GetFileNoData.cpp b/libminifi/test/curl-tests/GetFileNoData.cpp deleted file mode 100644 index 8bc2a4e..0000000 --- a/libminifi/test/curl-tests/GetFileNoData.cpp +++ /dev/null @@ -1,184 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/stat.h> -#undef NDEBUG -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include <sstream> -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "c2/C2Agent.h" -#include "CivetServer.h" -#include <cstring> -#include "RESTSender.h" - -void waitToVerifyProcessor() { - std::this_thread::sleep_for(std::chrono::seconds(10)); -} - -static std::vector<std::string> responses; - -class ConfigHandler : public CivetHandler { - public: - bool handlePost(CivetServer *server, struct mg_connection *conn) { - if (responses.size() > 0) { - std::string top_str = responses.back(); - responses.pop_back(); - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - top_str.length()); - mg_printf(conn, "%s", top_str.c_str()); - } else { - mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); - } - - return true; - } - - bool handleGet(CivetServer *server, struct mg_connection *conn) { - std::ifstream myfile(test_file_location_.c_str()); - - if (myfile.is_open()) { - std::stringstream buffer; - buffer << myfile.rdbuf(); - std::string str = buffer.str(); - myfile.close(); - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - str.length()); - mg_printf(conn, "%s", str.c_str()); - } else { - mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); - } - - return true; - } - std::string test_file_location_; -}; - -int main(int argc, char **argv) { - mg_init_library(0); - LogTestController::getInstance().setInfo<minifi::FlowController>(); - LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); - LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); - LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); - - const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; - std::vector<std::string> cpp_options; - for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { - cpp_options.push_back(options[i]); - } - - CivetServer server(cpp_options); - ConfigHandler h_ex; - server.addHandler("/update", h_ex); - std::string key_dir, test_file_location; - if (argc > 1) { - h_ex.test_file_location_ = test_file_location = argv[1]; - key_dir = argv[2]; - } - std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" - "\"operation\" : \"update\", " - "\"operationid\" : \"8675309\", " - "\"name\": \"configuration\"" - "}]}"; - - responses.push_back(heartbeat_response); - - std::ifstream myfile(test_file_location.c_str()); - - if (myfile.is_open()) { - std::stringstream buffer; - buffer << myfile.rdbuf(); - std::string str = buffer.str(); - myfile.close(); - std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" - "\"operation\" : \"update\", " - "\"operationid\" : \"8675309\", " - "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}"; - responses.push_back(response); - } - - std::shared_ptr<minifi::Configure> configuration = std::make_shared< - minifi::Configure>(); - - configuration->set("c2.rest.url", - "http://localhost:9090/update"); - mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - - std::shared_ptr<core::Repository> test_repo = - std::make_shared<TestRepository>(); - std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< - TestFlowRepository>(); - - configuration->set(minifi::Configure::nifi_flow_configuration_file, - test_file_location); - - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared - <minifi::io::StreamFactory>(configuration); - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr - <core::YamlConfiguration - >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, - configuration, - test_file_location)); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast - <TestRepository>(test_repo); - - std::shared_ptr<minifi::FlowController> controller = - std::make_shared<minifi::FlowController - >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); - - core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, - configuration, - test_file_location); - - std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( - test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup - >(ptr.get()); - ptr.release(); - - controller->load(); - controller->start(); - waitToVerifyProcessor(); - - controller->waitUnload(60000); - std::string logs = LogTestController::getInstance().log_output.str(); - assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); - LogTestController::getInstance().reset(); - - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp b/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp deleted file mode 100644 index 775739d..0000000 --- a/libminifi/test/curl-tests/HTTPSiteToSiteTests.cpp +++ /dev/null @@ -1,262 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 -#include <sys/stat.h> -#undef NDEBUG -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include <sstream> -#include "HTTPClient.h" -#include "CivetServer.h" -#include "sitetosite/HTTPProtocol.h" -#include "InvokeHTTP.h" -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "io/StreamFactory.h" -#include "RemoteProcessorGroupPort.h" -#include "core/ConfigurableComponent.h" -#include "../TestServer.h" -#include "../integration/IntegrationBase.h" -#include "sitetositehttp/HTTPHandlers.h" -#include "client/HTTPStream.h" - -class SiteToSiteTestHarness : public IntegrationBase { - public: - explicit SiteToSiteTestHarness(bool isSecure) - : isSecure(isSecure) { - char format[] = "/tmp/ssth.XXXXXX"; - dir = testController.createTempDirectory(format); - } - - void testSetup() { - LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>(); - LogTestController::getInstance().setDebug<minifi::sitetosite::HttpSiteToSiteClient>(); - LogTestController::getInstance().setDebug<minifi::sitetosite::SiteToSiteClient>(); - LogTestController::getInstance().setDebug<utils::HTTPClient>(); - LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>(); - LogTestController::getInstance().setInfo<minifi::FlowController>(); - LogTestController::getInstance().setDebug<core::ConfigurableComponent>(); - - std::fstream file; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - - configuration->set("nifi.c2.enable", "false"); - configuration->set("nifi.remote.input.http.enabled", "true"); - configuration->set("nifi.remote.input.socket.port", "8082"); - } - - virtual void waitToVerifyProcessor() { - std::this_thread::sleep_for(std::chrono::seconds(3)); - } - - void cleanup() { - unlink(ss.str().c_str()); - } - - void runAssertions() { - } - - protected: - bool isSecure; - char *dir; - std::stringstream ss; - TestController testController; -}; - -struct test_profile { - test_profile() - : flow_url_broken(false), - transaction_url_broken(false), - empty_transaction_url(false), - no_delete(false), - invalid_checksum(false) { - } - - bool allFalse() const { - return !flow_url_broken && !transaction_url_broken && !empty_transaction_url && !no_delete && !invalid_checksum; - } - // tests for a broken flow file url - bool flow_url_broken; - // transaction url will return incorrect information - bool transaction_url_broken; - // Location will be absent within the - bool empty_transaction_url; - // delete url is not supported. - bool no_delete; - // invalid checksum error - bool invalid_checksum; -}; - -void run_variance(std::string test_file_location, bool isSecure, std::string url, const struct test_profile &profile) { - SiteToSiteTestHarness harness(isSecure); - - SiteToSiteLocationResponder responder(isSecure); - - TransactionResponder transaction_response(url, "471deef6-2a6e-4a7d-912a-81cc17e3a204", true, profile.transaction_url_broken, profile.empty_transaction_url); - - std::string transaction_id = transaction_response.getTransactionId(); - - harness.setKeyDir(""); - - std::string controller_loc = url + "/controller"; - - harness.setUrl(controller_loc, &responder); - - std::string transaction_url = url + "/data-transfer/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions"; - std::string action_url = url + "/site-to-site/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions"; - - std::string transaction_output_url = url + "/data-transfer/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions"; - std::string action_output_url = url + "/site-to-site/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions"; - - harness.setUrl(transaction_url, &transaction_response); - - std::string peer_url = url + "/site-to-site/peers"; - - PeerResponder peer_response(url); - - harness.setUrl(peer_url, &peer_response); - - std::string flow_url = action_url + "/" + transaction_id + "/flow-files"; - - FlowFileResponder flowResponder(true, profile.flow_url_broken, profile.invalid_checksum); - flowResponder.setFlowUrl(flow_url); - auto producedFlows = flowResponder.getFlows(); - - TransactionResponder transaction_response_output(url, "471deef6-2a6e-4a7d-912a-81cc17e3a203", false, profile.transaction_url_broken, profile.empty_transaction_url); - std::string transaction_output_id = transaction_response_output.getTransactionId(); - transaction_response_output.setFeed(producedFlows); - - harness.setUrl(transaction_output_url, &transaction_response_output); - - std::string flow_output_url = action_output_url + "/" + transaction_output_id + "/flow-files"; - - FlowFileResponder flowOutputResponder(false, profile.flow_url_broken, profile.invalid_checksum); - flowOutputResponder.setFlowUrl(flow_output_url); - flowOutputResponder.setFeed(producedFlows); - - harness.setUrl(flow_url, &flowResponder); - harness.setUrl(flow_output_url, &flowOutputResponder); - - if (!profile.no_delete) { - std::string delete_url = transaction_url + "/" + transaction_id; - DeleteTransactionResponder deleteResponse(delete_url, "201 OK", 12); - harness.setUrl(delete_url, &deleteResponse); - - std::string delete_output_url = transaction_output_url + "/" + transaction_output_id; - DeleteTransactionResponder deleteOutputResponse(delete_output_url, "201 OK", producedFlows); - harness.setUrl(delete_output_url, &deleteOutputResponse); - } - - harness.run(test_file_location); - - std::stringstream assertStr; - if (profile.allFalse()) { - assertStr << "Site2Site transaction " << transaction_id << " peer finished transaction"; - assert(LogTestController::getInstance().contains(assertStr.str()) == true); - } else if (profile.empty_transaction_url) { - assert(LogTestController::getInstance().contains("Location is empty") == true); - } else if (profile.transaction_url_broken) { - assert(LogTestController::getInstance().contains("Could not create transaction, intent is ohstuff") == true); - } else if (profile.invalid_checksum) { - assertStr << "Site2Site transaction " << transaction_id << " peer confirm transaction with CRC Imawrongchecksumshortandstout"; - assert(LogTestController::getInstance().contains(assertStr.str()) == true); - assertStr.str(std::string()); - assertStr << "Site2Site transaction " << transaction_id << " CRC not matched"; - assert(LogTestController::getInstance().contains(assertStr.str()) == true); - assertStr.str(std::string()); - assertStr << "Site2Site delete transaction " << transaction_id; - assert(LogTestController::getInstance().contains(assertStr.str()) == true); - } else if (profile.no_delete) { - assert(LogTestController::getInstance().contains("Received 401 response code from delete") == true); - } else { - assertStr << "Site2Site transaction " << transaction_id << " peer unknown respond code 254"; - assert(LogTestController::getInstance().contains(assertStr.str()) == true); - } - LogTestController::getInstance().reset(); -} - -int main(int argc, char **argv) { - transaction_id = 0; - transaction_id_output = 0; - std::string key_dir, test_file_location, url; - if (argc > 1) { - test_file_location = argv[1]; - key_dir = argv[2]; - url = argv[3]; - } - - bool isSecure = false; - if (url.find("https") != std::string::npos) { - isSecure = true; - } - - { - struct test_profile profile; - run_variance(test_file_location, isSecure, url, profile); - } - - { - struct test_profile profile; - profile.flow_url_broken = true; - run_variance(test_file_location, isSecure, url, profile); - } - - { - struct test_profile profile; - profile.empty_transaction_url = true; - run_variance(test_file_location, isSecure, url, profile); - } - - { - struct test_profile profile; - profile.transaction_url_broken = true; - run_variance(test_file_location, isSecure, url, profile); - } - - { - struct test_profile profile; - profile.no_delete = true; - run_variance(test_file_location, isSecure, url, profile); - } - - { - struct test_profile profile; - profile.invalid_checksum = true; - run_variance(test_file_location, isSecure, url, profile); - } - - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/HttpGetIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/HttpGetIntegrationTest.cpp b/libminifi/test/curl-tests/HttpGetIntegrationTest.cpp deleted file mode 100644 index 702be14..0000000 --- a/libminifi/test/curl-tests/HttpGetIntegrationTest.cpp +++ /dev/null @@ -1,162 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 -#include <sys/stat.h> -#undef NDEBUG -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include "HTTPClient.h" -#include "InvokeHTTP.h" -#include "../TestServer.h" -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "processors/InvokeHTTP.h" -#include "processors/ListenHTTP.h" -#include "processors/LogAttribute.h" - -void waitToVerifyProcessor() { - std::this_thread::sleep_for(std::chrono::seconds(10)); -} - -int log_message(const struct mg_connection *conn, const char *message) { - puts(message); - return 1; -} - -int ssl_enable(void *ssl_context, void *user_data) { - struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; - return 0; -} - -class HttpResponder : public CivetHandler { - public: - bool handleGet(CivetServer *server, struct mg_connection *conn) { - static const std::string site2site_rest_resp = "hi this is a get test"; - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - site2site_rest_resp.length()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } -}; - -int main(int argc, char **argv) { - init_webserver(); - LogTestController::getInstance().setDebug<core::Processor>(); - LogTestController::getInstance().setDebug<core::ProcessSession>(); - LogTestController::getInstance().setDebug<utils::HTTPClient>(); - LogTestController::getInstance().setDebug<minifi::controllers::SSLContextService>(); - LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>(); - LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); - std::string key_dir, test_file_location; - if (argc > 1) { - test_file_location = argv[1]; - key_dir = argv[2]; - } - std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); - configuration->set(minifi::Configure::nifi_default_directory, key_dir); - mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - - std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); - std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); - - configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); - - std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); - - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - - content_repo->initialize(configuration); - - std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( - new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); - std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); - - std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), - content_repo, - DEFAULT_ROOT_GROUP_NAME, - true); - - core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); - - std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); - std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); - std::shared_ptr<core::Processor> proc = ptr->findProcessor("invoke"); - assert(proc != nullptr); - - std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); - - assert(inv != nullptr); - std::string url = ""; - inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); - ptr.release(); - HttpResponder h_ex; - std::string port, scheme, path; - CivetServer *server = nullptr; - - parse_http_components(url, port, scheme, path); - struct mg_callbacks callback; - if (url.find("localhost") != std::string::npos) { - if (scheme == "https") { - std::string cert = ""; - cert = key_dir + "nifi-cert.pem"; - memset(&callback, 0, sizeof(callback)); - callback.init_ssl = ssl_enable; - port +="s"; - callback.log_message = log_message; - server = start_webserver(port, path, &h_ex, &callback, cert, cert); - } else { - server = start_webserver(port, path, &h_ex); - } - } - controller->load(); - controller->start(); - waitToVerifyProcessor(); - - controller->waitUnload(60000); - if (url.find("localhost") == std::string::npos) { - stop_webserver(server); - exit(1); - } - std::string logs = LogTestController::getInstance().log_output.str(); - - assert(logs.find("key:filename value:") != std::string::npos); - assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos); - assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos); - - LogTestController::getInstance().reset(); - rmdir("./content_repository"); - stop_webserver(server); - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/HttpPostIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/HttpPostIntegrationTest.cpp b/libminifi/test/curl-tests/HttpPostIntegrationTest.cpp deleted file mode 100644 index 1c48f7d..0000000 --- a/libminifi/test/curl-tests/HttpPostIntegrationTest.cpp +++ /dev/null @@ -1,114 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/stat.h> -#undef NDEBUG -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include "HTTPClient.h" -#include "InvokeHTTP.h" -#include "processors/ListenHTTP.h" -#include "processors/LogAttribute.h" -#include <sstream> -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "CivetServer.h" -#include "RemoteProcessorGroupPort.h" -#include "core/ConfigurableComponent.h" -#include "controllers/SSLContextService.h" -#include "../TestServer.h" -#include "../integration/IntegrationBase.h" - -class HttpTestHarness : public IntegrationBase { - public: - HttpTestHarness() { - char format[] = "/tmp/ssth.XXXXXX"; - dir = testController.createTempDirectory(format); - } - - void testSetup() { - LogTestController::getInstance().setDebug<minifi::FlowController>(); - LogTestController::getInstance().setDebug<core::ProcessGroup>(); - LogTestController::getInstance().setDebug<minifi::SchedulingAgent>(); - LogTestController::getInstance().setDebug<core::ProcessContext>(); - LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); - LogTestController::getInstance().setDebug<utils::HTTPClient>(); - LogTestController::getInstance().setDebug<processors::ListenHTTP>(); - LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>(); - LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>(); - LogTestController::getInstance().setDebug<processors::LogAttribute>(); - LogTestController::getInstance().setDebug<core::Processor>(); - LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>(); - LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>(); - LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); - std::fstream file; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - configuration->set("nifi.flow.engine.threads", "8"); - configuration->set("nifi.c2.enable", "false"); - } - - void cleanup() { - unlink(ss.str().c_str()); - } - - void runAssertions() { - assert(LogTestController::getInstance().contains("curl performed") == true); - assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true); - assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false); - } - - protected: - char *dir; - std::stringstream ss; - TestController testController; -}; - -int main(int argc, char **argv) { - std::string key_dir, test_file_location, url; - if (argc > 1) { - test_file_location = argv[1]; - key_dir = argv[2]; - } - - HttpTestHarness harness; - - harness.setKeyDir(key_dir); - - harness.run(test_file_location); - - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/SiteToSiteRestTest.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/SiteToSiteRestTest.cpp b/libminifi/test/curl-tests/SiteToSiteRestTest.cpp deleted file mode 100644 index 13826ab..0000000 --- a/libminifi/test/curl-tests/SiteToSiteRestTest.cpp +++ /dev/null @@ -1,145 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 -#include <sys/stat.h> -#undef NDEBUG -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include <sstream> -#include "HTTPClient.h" -#include "InvokeHTTP.h" -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "CivetServer.h" -#include "RemoteProcessorGroupPort.h" -#include "core/ConfigurableComponent.h" -#include "controllers/SSLContextService.h" -#include "../TestServer.h" -#include "../integration/IntegrationBase.h" - -class Responder : public CivetHandler { - public: - explicit Responder(bool isSecure) - : isSecure(isSecure) { - } - bool handleGet(CivetServer *server, struct mg_connection *conn) { - std::string site2site_rest_resp = "{" - "\"revision\": {" - "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" - "}," - "\"controller\": {" - "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," - "\"name\": \"NiFi Flow\"," - "\"remoteSiteListeningPort\": 10001," - "\"siteToSiteSecure\": "; - site2site_rest_resp += (isSecure ? "true" : "false"); - site2site_rest_resp += "}}"; - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - site2site_rest_resp.length()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - protected: - bool isSecure; -}; - -class SiteToSiteTestHarness : public IntegrationBase { - public: - explicit SiteToSiteTestHarness(bool isSecure) - : isSecure(isSecure) { - char format[] = "/tmp/ssth.XXXXXX"; - dir = testController.createTempDirectory(format); - } - - void testSetup() { - LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>(); - LogTestController::getInstance().setDebug<utils::HTTPClient>(); - LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>(); - LogTestController::getInstance().setInfo<minifi::FlowController>(); - LogTestController::getInstance().setDebug<core::ConfigurableComponent>(); - - std::fstream file; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - } - - void cleanup() { - unlink(ss.str().c_str()); - } - - void runAssertions() { - if (isSecure) { - assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 1") == true); - } else { - assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 0") == true); - } - } - - protected: - bool isSecure; - char *dir; - std::stringstream ss; - TestController testController; -}; - -int main(int argc, char **argv) { - std::string key_dir, test_file_location, url; - if (argc > 1) { - test_file_location = argv[1]; - key_dir = argv[2]; - url = argv[3]; - } - - bool isSecure = false; - if (url.find("https") != std::string::npos) { - isSecure = true; - } - - SiteToSiteTestHarness harness(isSecure); - - Responder responder(isSecure); - - harness.setKeyDir(key_dir); - - harness.setUrl(url, &responder); - - harness.run(test_file_location); - - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/ThreadPoolAdjust.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/ThreadPoolAdjust.cpp b/libminifi/test/curl-tests/ThreadPoolAdjust.cpp deleted file mode 100644 index 53ed7ba..0000000 --- a/libminifi/test/curl-tests/ThreadPoolAdjust.cpp +++ /dev/null @@ -1,114 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <sys/stat.h> -#undef NDEBUG -#include <cassert> -#include <utility> -#include <chrono> -#include <fstream> -#include <memory> -#include <string> -#include <thread> -#include <type_traits> -#include <vector> -#include <iostream> -#include <sstream> -#include "HTTPClient.h" -#include "InvokeHTTP.h" -#include "../TestBase.h" -#include "utils/StringUtils.h" -#include "core/Core.h" -#include "../include/core/logging/Logger.h" -#include "core/ProcessGroup.h" -#include "core/yaml/YamlConfiguration.h" -#include "FlowController.h" -#include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" -#include "io/StreamFactory.h" -#include "CivetServer.h" -#include "RemoteProcessorGroupPort.h" -#include "core/ConfigurableComponent.h" -#include "controllers/SSLContextService.h" -#include "../TestServer.h" -#include "../integration/IntegrationBase.h" -#include "processors/InvokeHTTP.h" -#include "processors/ListenHTTP.h" -#include "processors/LogAttribute.h" - -class HttpTestHarness : public IntegrationBase { - public: - HttpTestHarness() { - char format[] = "/tmp/ssth.XXXXXX"; - dir = testController.createTempDirectory(format); - } - - void testSetup() { - LogTestController::getInstance().setDebug<minifi::FlowController>(); - LogTestController::getInstance().setDebug<core::ProcessGroup>(); - LogTestController::getInstance().setDebug<minifi::SchedulingAgent>(); - LogTestController::getInstance().setDebug<core::ProcessContext>(); - LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); - LogTestController::getInstance().setDebug<utils::HTTPClient>(); - LogTestController::getInstance().setDebug<processors::ListenHTTP>(); - LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>(); - LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>(); - LogTestController::getInstance().setDebug<processors::LogAttribute>(); - LogTestController::getInstance().setDebug<core::Processor>(); - LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>(); - LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>(); - LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); - std::fstream file; - ss << dir << "/" << "tstFile.ext"; - file.open(ss.str(), std::ios::out); - file << "tempFile"; - file.close(); - configuration->set("nifi.flow.engine.threads", "1"); - } - - void cleanup() { - unlink(ss.str().c_str()); - } - - void runAssertions() { - assert(LogTestController::getInstance().contains("curl performed") == true); - assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true); - assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false); - } - - protected: - char *dir; - std::stringstream ss; - TestController testController; -}; - -int main(int argc, char **argv) { - std::string key_dir, test_file_location, url; - if (argc > 1) { - test_file_location = argv[1]; - key_dir = argv[2]; - } - - HttpTestHarness harness; - - harness.setKeyDir(key_dir); - - harness.run(test_file_location); - - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/sitetositehttp/CivetStream.h ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/sitetositehttp/CivetStream.h b/libminifi/test/curl-tests/sitetositehttp/CivetStream.h deleted file mode 100644 index 571b0ca..0000000 --- a/libminifi/test/curl-tests/sitetositehttp/CivetStream.h +++ /dev/null @@ -1,138 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ -#define EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ - -#include <memory> -#include <thread> -#include <mutex> -#include <future> -#include <vector> - -#include "io/BaseStream.h" -#include "civetweb.h" -#include "CivetServer.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace io { - -class CivetStream : public io::BaseStream { - public: - /** - * File Stream constructor that accepts an fstream shared pointer. - * It must already be initialized for read and write. - */ - explicit CivetStream(struct mg_connection *conn) - : io::BaseStream(), conn(conn) { - - } - - virtual ~CivetStream() { - } - /** - * Skip to the specified offset. - * @param offset offset to which we will skip - */ - void seek(uint64_t offset){ - - } - - const uint64_t getSize() const { - return BaseStream::readBuffer; - } - - // data stream extensions - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(std::vector<uint8_t> &buf, int buflen) { - if (buf.capacity() < buflen) { - buf.resize(buflen); - } - int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); - - if (ret < buflen) { - buf.resize(ret); - } - return ret; - } - - /** - * Reads data and places it into buf - * @param buf buffer in which we extract data - * @param buflen - */ - virtual int readData(uint8_t *buf, int buflen) { - return mg_read(conn,buf,buflen); - } - - /** - * Write value to the stream using std::vector - * @param buf incoming buffer - * @param buflen buffer to write - * - */ - virtual int writeData(std::vector<uint8_t> &buf, int buflen) { - return 0; - } - - /** - * writes value to stream - * @param value value to write - * @param size size of value - */ - virtual int writeData(uint8_t *value, int size) { - return 0; - } - - protected: - - /** - * Creates a vector and returns the vector using the provided - * type name. - * @param t incoming object - * @returns vector. - */ - template<typename T> - inline std::vector<uint8_t> readBuffer(const T& t) { - std::vector<uint8_t> buf; - buf.resize(sizeof t); - readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t)); - return buf; - } - - void reset(); - - //size_t pos; - struct mg_connection *conn; - - private: - - std::shared_ptr<logging::Logger> logger_; -}; -} /* namespace io */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ - -#endif /* EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h b/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h deleted file mode 100644 index 911d6d4..0000000 --- a/libminifi/test/curl-tests/sitetositehttp/HTTPHandlers.h +++ /dev/null @@ -1,321 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "civetweb.h" -#include "CivetServer.h" -#include "concurrentqueue.h" - -#include "CivetStream.h" -#include "io/CRCStream.h" -#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ -#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ -static std::atomic<int> transaction_id; -static std::atomic<int> transaction_id_output; - -class FlowObj { - public: - FlowObj() - : total_size(0) { - - } - explicit FlowObj(const FlowObj &&other) - : total_size(std::move(other.total_size)), - attributes(std::move(other.attributes)), - data(std::move(other.data)) { - - } - uint64_t total_size; - std::map<std::string, std::string> attributes; - std::vector<uint8_t> data; - -}; - -class SiteToSiteLocationResponder : public CivetHandler { - public: - explicit SiteToSiteLocationResponder(bool isSecure) - : isSecure(isSecure) { - } - bool handleGet(CivetServer *server, struct mg_connection *conn) { - std::string site2site_rest_resp = "{" - "\"revision\": {" - "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" - "}," - "\"controller\": {" - "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," - "\"name\": \"NiFi Flow\"," - "\"siteToSiteSecure\": "; - site2site_rest_resp += (isSecure ? "true" : "false"); - site2site_rest_resp += "}}"; - mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " - "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - site2site_rest_resp.length()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - protected: - bool isSecure; -}; - -class PeerResponder : public CivetHandler { - public: - - explicit PeerResponder(const std::string base_url) - : base_url(base_url) { - } - - bool handleGet(CivetServer *server, struct mg_connection *conn) { - std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"localhost\", \"port\": 8082, \"secure\": false, \"flowFileCount\" : 0 }] }"; - std::stringstream headers; - headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; - mg_printf(conn, "%s", headers.str().c_str()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - protected: - std::string base_url; -}; - -class TransactionResponder : public CivetHandler { - public: - - explicit TransactionResponder(const std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri) - : base_url(base_url), - wrong_uri(wrong_uri), - empty_transaction_uri(empty_transaction_uri), - input_port(input_port), - port_id(port_id), - flow_files_feed_(nullptr) { - - if (input_port) { - transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96"; - transaction_id_str += std::to_string(transaction_id.load()); - transaction_id++; - } else { - transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95"; - transaction_id_str += std::to_string(transaction_id_output.load()); - transaction_id_output++; - } - } - - bool handlePost(CivetServer *server, struct mg_connection *conn) { - std::string site2site_rest_resp = ""; - std::stringstream headers; - headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nx-location-uri-intent: "; - if (wrong_uri) - headers << "ohstuff\r\n"; - else - headers << "transaction-url\r\n"; - - std::string port_type; - - if (input_port) - port_type = "input-ports"; - else - port_type = "output-ports"; - if (!empty_transaction_uri) - headers << "Location: " << base_url << "/site-to-site/" << port_type << "/" << port_id << "/transactions/" << transaction_id_str << "\r\n"; - headers << "Connection: close\r\n\r\n"; - mg_printf(conn, "%s", headers.str().c_str()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { - flow_files_feed_ = feed; - } - - std::string getTransactionId() { - return transaction_id_str; - } - protected: - std::string base_url; - std::string transaction_id_str; - bool wrong_uri; - bool empty_transaction_uri; - bool input_port; - std::string port_id; - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; -}; - -class FlowFileResponder : public CivetHandler { - public: - - explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum) - : wrong_uri(wrong_uri), - input_port(input_port), - invalid_checksum(invalid_checksum), - flow_files_feed_(nullptr) { - } - - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() { - return &flow_files_; - } - - void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { - flow_files_feed_ = feed; - } - - bool handlePost(CivetServer *server, struct mg_connection *conn) { - std::string site2site_rest_resp = ""; - std::stringstream headers; - - if (!wrong_uri) { - minifi::io::CivetStream civet_stream(conn); - minifi::io::CRCStream<minifi::io::CivetStream> stream(&civet_stream); - uint32_t num_attributes; - uint64_t total_size = 0; - total_size += stream.read(num_attributes); - - auto flow = std::make_shared<FlowObj>(); - - for (int i = 0; i < num_attributes; i++) { - std::string name, value; - total_size += stream.readUTF(name, true); - total_size += stream.readUTF(value, true); - flow->attributes[name] = value; - } - uint64_t length; - total_size += stream.read(length); - - total_size += length; - flow->data.resize(length); - flow->total_size = total_size; - - assert(stream.readData(flow->data.data(), length) == length); - - assert(flow->attributes["path"] == "."); - assert(!flow->attributes["uuid"].empty()); - assert(!flow->attributes["filename"].empty()); - - if (!invalid_checksum) { - site2site_rest_resp = std::to_string(stream.getCRC()); - flow_files_.enqueue(flow); - } else { - site2site_rest_resp = "Imawrongchecksumshortandstout"; - } - - headers << "HTTP/1.1 202 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; - } else { - headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n"; - } - - mg_printf(conn, "%s", headers.str().c_str()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - bool handleGet(CivetServer *server, struct mg_connection *conn) { - - if (flow_files_feed_->size_approx() > 0) { - std::shared_ptr<FlowObj> flow; - uint8_t buf[1]; - std::vector<std::shared_ptr<FlowObj>> flows; - uint64_t total = 0; - - while (flow_files_feed_->try_dequeue(flow)) { - flows.push_back(flow); - total += flow->total_size; - } - mg_printf(conn, "HTTP/1.1 200 OK\r\n" - "Content-Length: %llu\r\n" - "Content-Type: application/octet-stream\r\n" - "Connection: close\r\n\r\n", - total); - minifi::io::BaseStream serializer; - minifi::io::CRCStream<minifi::io::BaseStream> stream(&serializer); - for (auto flow : flows) { - uint32_t num_attributes = flow->attributes.size(); - stream.write(num_attributes); - for (auto entry : flow->attributes) { - stream.writeUTF(entry.first); - stream.writeUTF(entry.second); - } - uint64_t length = flow->data.size(); - stream.write(length); - stream.writeData(flow->data.data(), length); - } - auto ret = mg_write(conn, serializer.getBuffer(), total); - } else { - std::cout << "Nothing to transfer feed" << std::endl; - mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: " - "close\r\nContent-Length: 0\r\n"); - mg_printf(conn, "Content-Type: text/plain\r\n\r\n"); - - } - return true; - } - - void setFlowUrl(std::string flowUrl) { - base_url = flowUrl; - } - - protected: - // base url - std::string base_url; - // set the wrong url - bool wrong_uri; - // we are running an input port - bool input_port; - // invalid checksum is returned. - bool invalid_checksum; - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_; - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; -}; - -class DeleteTransactionResponder : public CivetHandler { - public: - - explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, int expected_resp_code) - : flow_files_feed_(nullptr), - base_url(base_url), - response_code(response_code) { - expected_resp_code_str = std::to_string(expected_resp_code); - } - - explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) - : flow_files_feed_(feed), - base_url(base_url), - response_code(response_code) { - } - - bool handleDelete(CivetServer *server, struct mg_connection *conn) { - - std::string site2site_rest_resp = ""; - std::stringstream headers; - std::string resp; - CivetServer::getParam(conn, "responseCode", resp); - headers << "HTTP/1.1 " << response_code << "\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\n"; - headers << "Connection: close\r\n\r\n"; - mg_printf(conn, "%s", headers.str().c_str()); - mg_printf(conn, "%s", site2site_rest_resp.c_str()); - return true; - } - - void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { - flow_files_feed_ = feed; - } - - protected: - moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; - std::string base_url; - std::string expected_resp_code_str; - std::string response_code; -}; - -#endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/curl-tests/unit/InvokeHTTPTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/curl-tests/unit/InvokeHTTPTests.cpp b/libminifi/test/curl-tests/unit/InvokeHTTPTests.cpp deleted file mode 100644 index cfb6588..0000000 --- a/libminifi/test/curl-tests/unit/InvokeHTTPTests.cpp +++ /dev/null @@ -1,314 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <uuid/uuid.h> -#include <fstream> -#include <map> -#include <memory> -#include <utility> -#include <string> -#include <set> -#include "FlowController.h" -#include "io/BaseStream.h" -#include "../../TestBase.h" -#include "processors/GetFile.h" -#include "core/Core.h" -#include "HTTPClient.h" -#include "InvokeHTTP.h" -#include "core/FlowFile.h" -#include "../../unit/ProvenanceTestHelper.h" -#include "core/Processor.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/ProcessorNode.h" -#include "processors/InvokeHTTP.h" -#include "processors/ListenHTTP.h" -#include "processors/LogAttribute.h" - -TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { - TestController testController; - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); - - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - - std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - - char format[] = "/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); - - std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); - - std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); - uuid_t processoruuid; - REQUIRE(true == listenhttp->getUUID(processoruuid)); - - uuid_t invokehttp_uuid; - REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); - - std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); - gcConnection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); - laConnection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); - connection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp"); - - connection2->setRelationship(core::Relationship("No Retry", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(listenhttp); - - connection2->setSourceUUID(invokehttp_uuid); - connection->setSourceUUID(processoruuid); - connection->setDestinationUUID(invokehttp_uuid); - - listenhttp->addConnection(connection); - invokehttp->addConnection(connection); - invokehttp->addConnection(connection2); - - std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp); - std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); - std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); - context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686"); - context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); - - context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); - context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8686/testytesttest"); - auto session = std::make_shared<core::ProcessSession>(context); - auto session2 = std::make_shared<core::ProcessSession>(context2); - - REQUIRE(listenhttp->getName() == "listenhttp"); - - std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); - - std::shared_ptr<core::FlowFile> record; - listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onSchedule(context, factory); - listenhttp->onTrigger(context, session); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); - invokehttp->onSchedule(context2, factory2); - invokehttp->onTrigger(context2, session2); - - provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); - std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); - record = session->get(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - listenhttp->incrementActiveTasks(); - listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onTrigger(context, session); - - reporter = session->getProvenanceReporter(); - - records = reporter->getEvents(); - session->commit(); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - invokehttp->onTrigger(context2, session2); - - session2->commit(); - records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); - } - std::shared_ptr<core::FlowFile> ffr = session2->get(); - REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); - LogTestController::getInstance().reset(); -} - -class CallBack : public minifi::OutputStreamCallback { - public: - CallBack() { - } - virtual ~CallBack() { - } - virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) { - // leaving the typo for posterity sake - std::string st = "we're gnna write some test stuff"; - return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), st.length()); - } -}; - -TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { - TestController testController; - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); - - std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); - - std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); - - std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); - - char format[] = "/tmp/gt.XXXXXX"; - char *dir = testController.createTempDirectory(format); - - std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); - - std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); - uuid_t processoruuid; - REQUIRE(true == listenhttp->getUUID(processoruuid)); - - uuid_t invokehttp_uuid; - REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); - - std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); - - std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); - gcConnection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); - laConnection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); - connection->setRelationship(core::Relationship("success", "description")); - - std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp"); - - connection2->setRelationship(core::Relationship("No Retry", "description")); - - // link the connections so that we can test results at the end for this - connection->setSource(listenhttp); - - connection->setSourceUUID(invokehttp_uuid); - connection->setDestinationUUID(processoruuid); - - connection2->setSourceUUID(processoruuid); - connection2->setSourceUUID(processoruuid); - - listenhttp->addConnection(connection); - invokehttp->addConnection(connection); - invokehttp->addConnection(connection2); - - - std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp); - std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp); - std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; - std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); - std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); - context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680"); - context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); - - context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); - context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8680/testytesttest"); - auto session = std::make_shared<core::ProcessSession>(context); - auto session2 = std::make_shared<core::ProcessSession>(context2); - - REQUIRE(listenhttp->getName() == "listenhttp"); - - std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); - - std::shared_ptr<core::FlowFile> record; - - CallBack callback; - - std::map<std::string, std::string> attributes; - attributes["testy"] = "test"; - std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, content_repo, attributes); - session2->write(flow, &callback); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); - invokehttp->onSchedule(context2, factory2); - invokehttp->onTrigger(context2, session2); - - listenhttp->incrementActiveTasks(); - listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onSchedule(context, factory); - listenhttp->onTrigger(context, session); - - provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); - std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); - record = session->get(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - listenhttp->incrementActiveTasks(); - listenhttp->setScheduledState(core::ScheduledState::RUNNING); - listenhttp->onTrigger(context, session); - - reporter = session->getProvenanceReporter(); - - records = reporter->getEvents(); - session->commit(); - - invokehttp->incrementActiveTasks(); - invokehttp->setScheduledState(core::ScheduledState::RUNNING); - invokehttp->onTrigger(context2, session2); - - session2->commit(); - records = reporter->getEvents(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); - } - std::shared_ptr<core::FlowFile> ffr = session2->get(); - REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); - LogTestController::getInstance().reset(); -} - -TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { - TestController testController; - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); - LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::ListenHTTP>(); - LogTestController::getInstance().setInfo<core::Processor>(); - - std::shared_ptr<TestPlan> plan = testController.createPlan(); - std::shared_ptr<core::Processor> processor = plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", "description"), false); - std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true); - - REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685")); - REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest")); - - REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST")); - REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), "http://localhost:8685/testytesttest")); - plan->reset(); - testController.runSession(plan, true); - - std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); - std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); - REQUIRE(record == nullptr); - REQUIRE(records.size() == 0); - - plan->reset(); - testController.runSession(plan, true); - - records = plan->getProvenanceRecords(); - record = plan->getCurrentFlowFile(); - - for (provenance::ProvenanceEventRecord *provEventRecord : records) { - REQUIRE(provEventRecord->getComponentType() == processor->getName()); - } - std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile(); - REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); - LogTestController::getInstance().reset(); -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/integration/IntegrationBase.h ---------------------------------------------------------------------- diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h index 7626cdf..8446993 100644 --- a/libminifi/test/integration/IntegrationBase.h +++ b/libminifi/test/integration/IntegrationBase.h @@ -18,37 +18,21 @@ #ifndef LIBMINIFI_TEST_INTEGRATION_INTEGRATIONBASE_H_ #define LIBMINIFI_TEST_INTEGRATION_INTEGRATIONBASE_H_ -#include "../TestServer.h" -#include "../include/core/logging/Logger.h" +#include "core/logging/Logger.h" #include "core/ProcessGroup.h" #include "core/yaml/YamlConfiguration.h" #include "FlowController.h" #include "properties/Configure.h" -#include "../unit/ProvenanceTestHelper.h" +#include "unit/ProvenanceTestHelper.h" #include "io/StreamFactory.h" -#include "CivetServer.h" #include "RemoteProcessorGroupPort.h" #include "core/ConfigurableComponent.h" #include "controllers/SSLContextService.h" -int log_message(const struct mg_connection *conn, const char *message) { - puts(message); - return 1; -} - -int ssl_enable(void *ssl_context, void *user_data) { - struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; - return 0; -} - - - class IntegrationBase { public: IntegrationBase(); - void setUrl(std::string url, CivetHandler *handler); - virtual ~IntegrationBase(); void run(std::string test_file_location); @@ -75,23 +59,18 @@ class IntegrationBase { } void configureSecurity(); - CivetServer *server; std::shared_ptr<minifi::Configure> configuration; std::string port, scheme, path; std::string key_dir; }; -IntegrationBase::IntegrationBase() - : server(nullptr), - configuration(std::make_shared< - minifi::Configure>()) -{ +IntegrationBase::IntegrationBase() + : configuration(std::make_shared<minifi::Configure>()) { mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); } IntegrationBase::~IntegrationBase() { - stop_webserver(server); rmdir("./content_repository"); } @@ -157,28 +136,4 @@ void IntegrationBase::run(std::string test_file_location) { cleanup(); } -void IntegrationBase::setUrl(std::string url, CivetHandler *handler) { - - parse_http_components(url, port, scheme, path); - struct mg_callbacks callback; - if (url.find("localhost") != std::string::npos) { - - if (server != nullptr){ - server->addHandler(path,handler); - return; - } - if (scheme == "https" && !key_dir.empty()) { - std::string cert = ""; - cert = key_dir + "nifi-cert.pem"; - memset(&callback, 0, sizeof(callback)); - callback.init_ssl = ssl_enable; - port += "s"; - callback.log_message = log_message; - server = start_webserver(port, path, handler, &callback, cert, cert); - } else { - server = start_webserver(port, path, handler); - } - } -} - -#endif /* LIBMINIFI_TEST_INTEGRATION_INTEGRATIONBASE_H_ */ +#endif /* LIBMINIFI_TEST_INTEGRATION_INTEGRATIONBASE_H_ */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/pcap-tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/test/pcap-tests/CMakeLists.txt b/libminifi/test/pcap-tests/CMakeLists.txt index ba4d7d2..14b4528 100644 --- a/libminifi/test/pcap-tests/CMakeLists.txt +++ b/libminifi/test/pcap-tests/CMakeLists.txt @@ -24,6 +24,7 @@ SET(PCAP_INT_TEST_COUNT 0) FOREACH(testfile ${PCAP_TESTS}) get_filename_component(testfilename "${testfile}" NAME_WE) add_executable("${testfilename}" "${testfile}" ) + target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/libminifi/test/") target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/pcap/") target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_BINARY_DIR}/extensions/pcap/pcap++/Dist/header/") createTests("${testfilename}") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/unit/ClassLoaderTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ClassLoaderTests.cpp b/libminifi/test/unit/ClassLoaderTests.cpp index 82825e9..ec3a470 100644 --- a/libminifi/test/unit/ClassLoaderTests.cpp +++ b/libminifi/test/unit/ClassLoaderTests.cpp @@ -25,7 +25,6 @@ TEST_CASE("TestLoader", "[TestLoader]") { TestController controller; REQUIRE(nullptr != core::ClassLoader::getDefaultClassLoader().instantiate("AppendHostInfo", "hosty")); - REQUIRE(nullptr != core::ClassLoader::getDefaultClassLoader().instantiate("ListenHTTP", "hosty2")); REQUIRE(nullptr == core::ClassLoader::getDefaultClassLoader().instantiate("Don'tExist", "hosty3")); REQUIRE(nullptr == core::ClassLoader::getDefaultClassLoader().instantiate("", "EmptyEmpty")); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/unit/GetTCPTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/GetTCPTests.cpp b/libminifi/test/unit/GetTCPTests.cpp index 5e47dff..60db1ee 100644 --- a/libminifi/test/unit/GetTCPTests.cpp +++ b/libminifi/test/unit/GetTCPTests.cpp @@ -25,7 +25,6 @@ #include "../unit/ProvenanceTestHelper.h" #include "../TestBase.h" #include "Scheduling.h" -#include "processors/ListenHTTP.h" #include "processors/LogAttribute.h" #include "processors/GetTCP.h" #include "core/Core.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/libminifi/test/unit/ProcessorTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/unit/ProcessorTests.cpp b/libminifi/test/unit/ProcessorTests.cpp index 2ae7553..d041ec4 100644 --- a/libminifi/test/unit/ProcessorTests.cpp +++ b/libminifi/test/unit/ProcessorTests.cpp @@ -24,7 +24,6 @@ #include <fstream> #include "../TestBase.h" -#include "processors/ListenHTTP.h" #include "processors/LogAttribute.h" #include "processors/GetFile.h" #include "../unit/ProvenanceTestHelper.h"
