MINIFICPP-251 Move Civet implementations to an extension. This closed #203.
Signed-off-by: Bin Qiu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/0981f9ac Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/0981f9ac Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/0981f9ac Branch: refs/heads/master Commit: 0981f9acf9243886015aed503f01a5c3bd300b0c Parents: fb8573f Author: Caleb Johnson <[email protected]> Authored: Thu Nov 16 23:06:22 2017 +0000 Committer: Bin Qiu <[email protected]> Committed: Mon Jan 8 10:25:00 2018 -0800 ---------------------------------------------------------------------- CMakeLists.txt | 14 +- cmake/BuildTests.cmake | 4 +- .../civet_curl_tests/C2NullConfiguration.cpp | 137 ++++++++ extensions/civet_curl_tests/C2UpdateTest.cpp | 183 ++++++++++ .../C2VerifyHeartbeatAndStop.cpp | 156 +++++++++ .../civet_curl_tests/C2VerifyServeResults.cpp | 131 ++++++++ extensions/civet_curl_tests/CMakeLists.txt | 82 +++++ .../ControllerServiceIntegrationTests.cpp | 160 +++++++++ extensions/civet_curl_tests/GetFileNoData.cpp | 184 ++++++++++ .../civet_curl_tests/HTTPSiteToSiteTests.cpp | 262 +++++++++++++++ .../civet_curl_tests/HttpGetIntegrationTest.cpp | 162 +++++++++ .../HttpPostIntegrationTest.cpp | 114 +++++++ .../civet_curl_tests/SiteToSiteRestTest.cpp | 145 ++++++++ .../civet_curl_tests/ThreadPoolAdjust.cpp | 115 +++++++ .../civet_curl_tests/include/TestServer.h | 117 +++++++ .../include/integration/HTTPIntegrationBase.h | 75 +++++ .../include/sitetositehttp/HTTPHandlers.h | 320 ++++++++++++++++++ extensions/civet_curl_tests/unit/CMakeLists.txt | 76 +++++ .../civet_curl_tests/unit/InvokeHTTPTests.cpp | 315 ++++++++++++++++++ extensions/civetweb/CMakeLists.txt | 70 ++++ extensions/civetweb/CivetLoader.cpp | 29 ++ extensions/civetweb/ListenHTTP.cpp | 333 +++++++++++++++++++ extensions/civetweb/RESTReceiver.cpp | 147 ++++++++ extensions/civetweb/include/CivetLoader.h | 70 ++++ .../include/c2/protocols/RESTReceiver.h | 110 ++++++ extensions/civetweb/include/io/CivetStream.h | 138 ++++++++ .../civetweb/include/processors/ListenHTTP.h | 121 +++++++ extensions/http-curl/CMakeLists.txt | 4 +- extensions/http-curl/HTTPCurlLoader.h | 10 +- extensions/http-curl/RESTSender.cpp | 140 ++++++++ extensions/http-curl/c2/protocols/RESTSender.h | 80 +++++ extensions/http-curl/protocols/RESTProtocol.cpp | 177 ---------- extensions/http-curl/protocols/RESTProtocol.h | 75 ----- extensions/http-curl/protocols/RESTReceiver.cpp | 147 -------- extensions/http-curl/protocols/RESTReceiver.h | 111 ------- extensions/http-curl/protocols/RESTSender.cpp | 140 -------- extensions/http-curl/protocols/RESTSender.h | 81 ----- extensions/libarchive/CMakeLists.txt | 2 +- extensions/rocksdb-repos/CMakeLists.txt | 2 +- extensions/script/CMakeLists.txt | 2 +- libminifi/CMakeLists.txt | 3 +- libminifi/include/c2/protocols/RESTProtocol.h | 74 +++++ libminifi/include/core/FlowConfiguration.h | 1 + libminifi/include/processors/ListenHTTP.h | 121 ------- libminifi/src/c2/protocols/RESTProtocol.cpp | 177 ++++++++++ libminifi/src/processors/ListenHTTP.cpp | 333 ------------------- libminifi/test/TestServer.h | 117 ------- .../test/curl-tests/C2NullConfiguration.cpp | 135 -------- libminifi/test/curl-tests/C2UpdateTest.cpp | 183 ---------- .../curl-tests/C2VerifyHeartbeatAndStop.cpp | 155 --------- .../test/curl-tests/C2VerifyServeResults.cpp | 131 -------- libminifi/test/curl-tests/CMakeLists.txt | 76 ----- .../ControllerServiceIntegrationTests.cpp | 160 --------- libminifi/test/curl-tests/GetFileNoData.cpp | 184 ---------- .../test/curl-tests/HTTPSiteToSiteTests.cpp | 262 --------------- .../test/curl-tests/HttpGetIntegrationTest.cpp | 162 --------- .../test/curl-tests/HttpPostIntegrationTest.cpp | 114 ------- .../test/curl-tests/SiteToSiteRestTest.cpp | 145 -------- libminifi/test/curl-tests/ThreadPoolAdjust.cpp | 114 ------- .../curl-tests/sitetositehttp/CivetStream.h | 138 -------- .../curl-tests/sitetositehttp/HTTPHandlers.h | 321 ------------------ .../test/curl-tests/unit/InvokeHTTPTests.cpp | 314 ----------------- libminifi/test/integration/IntegrationBase.h | 55 +-- libminifi/test/pcap-tests/CMakeLists.txt | 1 + libminifi/test/unit/ClassLoaderTests.cpp | 1 - libminifi/test/unit/GetTCPTests.cpp | 1 - libminifi/test/unit/ProcessorTests.cpp | 1 - libminifi/test/unit/PutFileTests.cpp | 1 - main/CMakeLists.txt | 2 +- 69 files changed, 4254 insertions(+), 3969 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index c7c4d12..e6f89a8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -116,10 +116,10 @@ set(CIVETWEB_ENABLE_CXX ON CACHE BOOL "Enable civet C++ library") SET(WITH_TOOLS OFF CACHE BOOL "Do not build RocksDB tools") SET(WITH_TESTS OFF CACHE BOOL "Build RocksDB library (not repo) tests") set(CIVET_THIRDPARTY_ROOT "${CMAKE_SOURCE_DIR}/thirdparty/civetweb-1.10/" CACHE STRING "Path to CivetWeb root") +set(CIVET_BINARY_ROOT "${CMAKE_BINARY_DIR}/thirdparty/civetweb-1.10/" CACHE STRING "Path to CivetWeb binary output") set(ROCKSDB_THIRDPARTY_ROOT "${CMAKE_SOURCE_DIR}/thirdparty/rocksdb/" CACHE STRING "Path to RocksDB root") add_subdirectory(thirdparty/yaml-cpp-yaml-cpp-20171024) -set(BUILD_CIVET_TESTING OFF) -add_subdirectory("${CIVET_THIRDPARTY_ROOT}" EXCLUDE_FROM_ALL) + include_directories(thirdparty/concurrentqueue) include_directories(thirdparty/yaml-cpp-yaml-cpp-20171024/include) @@ -140,7 +140,12 @@ add_subdirectory(libminifi) #### EXTENSION option(DISABLE_CURL "Disables libCurl Properties." OFF) if (NOT DISABLE_CURL) - createExtension(HTTP-CURL "HTTP CURL" "This enables RESTProtocol, InvokeHTTP, and the HTTPClient for Site to Site" "extensions/http-curl" "${TEST_DIR}/curl-tests") + createExtension(HTTP-CURL "HTTP CURL" "This enables RESTProtocol, InvokeHTTP, and the HTTPClient for Site to Site" "extensions/http-curl") +endif() + +option(DISABLE_CIVET "Disables CivetWeb components." OFF) +if (NOT DISABLE_CIVET) +createExtension(CIVETWEB CIVETWEB "This enables ListenHTTP and several cURL tests" "extensions/civetweb") endif() ## Add the rocks DB extension @@ -250,6 +255,9 @@ include(CPack) if (NOT SKIP_TESTS) include(BuildTests) + if (NOT (DISABLE_CURL OR DISABLE_CIVET)) + add_subdirectory(extensions/civet_curl_tests) + endif() endif() include(BuildDocs) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/cmake/BuildTests.cmake ---------------------------------------------------------------------- diff --git a/cmake/BuildTests.cmake b/cmake/BuildTests.cmake index 4499305..1f6fc8a 100644 --- a/cmake/BuildTests.cmake +++ b/cmake/BuildTests.cmake @@ -39,7 +39,7 @@ function(createTests testName) target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/spdlog-20170710/include") target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/yaml-cpp-yaml-cpp-0.5.3/include") target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/jsoncpp/include") - target_include_directories(${testName} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include") + target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/thirdparty/libarchive-3.3.2/libarchive") target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/include") target_include_directories(${testName} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/include/") @@ -59,7 +59,7 @@ function(createTests testName) target_include_directories(${testName} BEFORE PRIVATE "${Boost_INCLUDE_DIRS}") endif() target_link_libraries(${testName} ${SPD_LIB} ${TEST_BASE_LIB}) - target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_LIBRARIES} core-minifi yaml-cpp c-library civetweb-cpp ${JSON_CPP_LIB} dl) + target_link_libraries(${testName} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_LIBRARIES} core-minifi yaml-cpp ${JSON_CPP_LIB}) if (APPLE) target_link_libraries (${testName} -Wl,-all_load minifi) else () http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/C2NullConfiguration.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/C2NullConfiguration.cpp b/extensions/civet_curl_tests/C2NullConfiguration.cpp new file mode 100644 index 0000000..c68e047 --- /dev/null +++ b/extensions/civet_curl_tests/C2NullConfiguration.cpp @@ -0,0 +1,137 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "TestServer.h" +#include "c2/protocols/RESTReceiver.h" +#include "c2/protocols/RESTSender.h" +#include "c2/C2Agent.h" +#include "c2/protocols/RESTReceiver.h" +#include "processors/LogAttribute.h" +#include "integration/HTTPIntegrationBase.h" + +class VerifyC2Server : public HTTPIntegrationBase { + public: + explicit VerifyC2Server(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("C2Agent] [info] Class is null") == true); + assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null") == true); + assert(LogTestController::getInstance().contains("Class is RESTSender") == true); + } + + void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { + std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + + + std::string port, scheme, path; + parse_http_components(url, port, scheme, path); + configuration->set("c2.agent.protocol.class", "null"); + configuration->set("c2.rest.url", ""); + configuration->set("c2.rest.url.ack", ""); + configuration->set("c2.agent.heartbeat.reporter.classes", "null"); + configuration->set("c2.rest.listener.port", "null"); + configuration->set("c2.agent.heartbeat.period", "null"); + configuration->set("c2.rest.listener.heartbeat.rooturi", "null"); + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + VerifyC2Server harness(isSecure); + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/C2UpdateTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/C2UpdateTest.cpp b/extensions/civet_curl_tests/C2UpdateTest.cpp new file mode 100644 index 0000000..edc92f0 --- /dev/null +++ b/extensions/civet_curl_tests/C2UpdateTest.cpp @@ -0,0 +1,183 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "c2/C2Agent.h" +#include "CivetServer.h" +#include <cstring> +#include "c2/protocols/RESTSender.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +static std::vector<std::string> responses; + +class ConfigHandler : public CivetHandler { + public: + ConfigHandler() { + calls_ = 0; + } + bool handlePost(CivetServer *server, struct mg_connection *conn) { + calls_++; + if (responses.size() > 0) { + std::string top_str = responses.back(); + responses.pop_back(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + top_str.length()); + mg_printf(conn, "%s", top_str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; + std::atomic<size_t> calls_; +}; + +int main(int argc, char **argv) { + mg_init_library(0); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + + const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/update", h_ex); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.test_file_location_ = test_file_location = argv[1]; + key_dir = argv[2]; + } + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\"" + "}]}"; + + responses.push_back(heartbeat_response); + + std::ifstream myfile(test_file_location.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}"; + responses.push_back(response); + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + + configuration->set("c2.rest.url", "http://localhost:9090/update"); + configuration->set("c2.agent.heartbeat.period", "1000"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); + ptr.release(); + auto start = std::chrono::system_clock::now(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + auto then = std::chrono::system_clock::now(); + + auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count(); + std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); + LogTestController::getInstance().reset(); + rmdir("./content_repository"); + assert(h_ex.calls_ <= (milliseconds / 1000) + 1); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/C2VerifyHeartbeatAndStop.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/C2VerifyHeartbeatAndStop.cpp b/extensions/civet_curl_tests/C2VerifyHeartbeatAndStop.cpp new file mode 100644 index 0000000..f447d69 --- /dev/null +++ b/extensions/civet_curl_tests/C2VerifyHeartbeatAndStop.cpp @@ -0,0 +1,156 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "TestServer.h" +#include "c2/C2Agent.h" +#include "c2/protocols/RESTReceiver.h" +#include "c2/protocols/RESTSender.h" +#include "integration/HTTPIntegrationBase.h" +#include "processors/LogAttribute.h" + +class Responder : public CivetHandler { + public: + explicit Responder(bool isSecure) + : isSecure(isSecure) { + } + bool handlePost(CivetServer *server, struct mg_connection *conn) { + std::string resp = + "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\" }, " + "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\" } ]}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + resp.length()); + mg_printf(conn, "%s", resp.c_str()); + return true; + } + + protected: + bool isSecure; +}; + +class VerifyC2Heartbeat : public HTTPIntegrationBase { + public: + explicit VerifyC2Heartbeat(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<LogTestController>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + LogTestController::getInstance().reset(); + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("Received Ack from Server") == true); + + assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke") == true); + + assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true); + } + + void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { + std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + + configuration->set("c2.rest.url", "http://localhost:8888/api/heartbeat"); + configuration->set("c2.agent.heartbeat.period", "1000"); + configuration->set("c2.rest.url.ack", "http://localhost:8888/api/heartbeat"); + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + url = "http://localhost:8888/api/heartbeat"; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + VerifyC2Heartbeat harness(isSecure); + + harness.setKeyDir(key_dir); + + Responder responder(isSecure); + + harness.setUrl(url, &responder); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/C2VerifyServeResults.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/C2VerifyServeResults.cpp b/extensions/civet_curl_tests/C2VerifyServeResults.cpp new file mode 100644 index 0000000..d99a2d8 --- /dev/null +++ b/extensions/civet_curl_tests/C2VerifyServeResults.cpp @@ -0,0 +1,131 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "processors/InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "TestServer.h" +#include "c2/C2Agent.h" +#include "c2/protocols/RESTReceiver.h" +#include "integration/HTTPIntegrationBase.h" +#include "processors/LogAttribute.h" + +class VerifyC2Server : public HTTPIntegrationBase { + public: + explicit VerifyC2Server(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("Import offset 0") == true); + + assert(LogTestController::getInstance().contains("Outputting success and response") == true); + } + + void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) { + std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + + + std::string port, scheme, path; + parse_http_components(url, port, scheme, path); + configuration->set("c2.agent.heartbeat.reporter.classes", "RESTReceiver"); + configuration->set("c2.rest.listener.port", port); + configuration->set("c2.agent.heartbeat.period", "10"); + configuration->set("c2.rest.listener.heartbeat.rooturi", path); + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + VerifyC2Server harness(isSecure); + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/CMakeLists.txt b/extensions/civet_curl_tests/CMakeLists.txt new file mode 100644 index 0000000..727c0ab --- /dev/null +++ b/extensions/civet_curl_tests/CMakeLists.txt @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +file(GLOB CURL_UNIT_TESTS "unit/*.cpp") +file(GLOB CURL_INTEGRATION_TESTS "*.cpp") + +SET(CURL_INT_TEST_COUNT 0) + +FOREACH(testfile ${CURL_UNIT_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS}) + target_include_directories(${testfilename} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test/") + target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/") + target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/client/") + target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/processors/") + target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/protocols/") + target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/sitetosite/") + target_include_directories(${testfilename} BEFORE PRIVATE "../civetweb/include/") + target_include_directories(${testfilename} BEFORE PRIVATE ./include) + createTests("${testfilename}") + target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) + if (APPLE) + target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl minifi-civet-extensions) + else () + target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl minifi-civet-extensions -Wl,--no-whole-archive) + endif() + MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") +ENDFOREACH() + +FOREACH(testfile ${CURL_INTEGRATION_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS}) + target_include_directories(${testfilename} BEFORE PRIVATE "${CIVET_THIRDPARTY_ROOT}/include") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/libminifi/test/") + target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/") + target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/client/") + target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/processors/") + target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/protocols/") + target_include_directories(${testfilename} BEFORE PRIVATE "../http-curl/sitetosite/") + target_include_directories(${testfilename} BEFORE PRIVATE "../civetweb/include/") + target_include_directories(${testfilename} BEFORE PRIVATE ./include) + createTests("${testfilename}") + if (APPLE) + target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl minifi-civet-extensions) + else () + target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl minifi-civet-extensions -Wl,--no-whole-archive) + endif() + MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") +ENDFOREACH() + +message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test file(s)...") + +add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") +add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/") +add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/") +add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site") +add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/") +add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/ControllerServiceIntegrationTests.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/ControllerServiceIntegrationTests.cpp b/extensions/civet_curl_tests/ControllerServiceIntegrationTests.cpp new file mode 100644 index 0000000..612603a --- /dev/null +++ b/extensions/civet_curl_tests/ControllerServiceIntegrationTests.cpp @@ -0,0 +1,160 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG +#include <cassert> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <utility> +#include <thread> +#include <type_traits> +#include <vector> + +#include "core/controller/ControllerServiceMap.h" +#include "core/controller/StandardControllerServiceNode.h" +#include "core/controller/StandardControllerServiceProvider.h" +#include "controllers/SSLContextService.h" +#include "core/Core.h" +#include "core/logging/LoggerConfiguration.h" +#include "core/ProcessGroup.h" +#include "core/Resource.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/MockClasses.h" +#include "unit/ProvenanceTestHelper.h" + +REGISTER_RESOURCE(MockControllerService); +REGISTER_RESOURCE(MockProcessor); + +std::shared_ptr<core::controller::StandardControllerServiceNode> newCsNode(std::shared_ptr<core::controller::ControllerServiceProvider> provider, const std::string id) { + std::shared_ptr<core::controller::ControllerService> service = std::make_shared<MockControllerService>(); + std::shared_ptr<core::controller::StandardControllerServiceNode> testNode = std::make_shared<core::controller::StandardControllerServiceNode>(service, provider, id, + std::make_shared<minifi::Configure>()); + return testNode; +} + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(2)); +} + +int main(int argc, char **argv) { + std::string test_file_location; + std::string key_dir; + + if (argc > 2) { + test_file_location = argv[1]; + key_dir = argv[1]; + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + std::string client_cert = "cn.crt.pem"; + std::string priv_key_file = "cn.ckey.pem"; + std::string passphrase = "cn.pass"; + std::string ca_cert = "nifi-cert.pem"; + configuration->set(minifi::Configure::nifi_security_client_certificate, test_file_location); + configuration->set(minifi::Configure::nifi_security_client_private_key, priv_key_file); + configuration->set(minifi::Configure::nifi_security_client_pass_phrase, passphrase); + configuration->set(minifi::Configure::nifi_default_directory, key_dir); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + content_repo->initialize(configuration); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), + content_repo, + DEFAULT_ROOT_GROUP_NAME, + true); + + disabled = false; + std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>(); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); + ptr.release(); + + std::shared_ptr<core::controller::StandardControllerServiceProvider> provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, pg, std::make_shared<minifi::Configure>()); + std::shared_ptr<core::controller::ControllerServiceNode> mockNode = pg->findControllerService("MockItLikeIts1995"); + assert(mockNode != nullptr); + mockNode->enable(); + std::vector<std::shared_ptr<core::controller::ControllerServiceNode> > linkedNodes = mockNode->getLinkedControllerServices(); + assert(linkedNodes.size() == 1); + + std::shared_ptr<core::controller::ControllerServiceNode> notexistNode = pg->findControllerService("MockItLikeItsWrong"); + assert(notexistNode == nullptr); + + std::shared_ptr<core::controller::ControllerServiceNode> ssl_client_cont = nullptr; + std::shared_ptr<minifi::controllers::SSLContextService> ssl_client = nullptr; + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->load(); + controller->start(); + ssl_client_cont = controller->getControllerServiceNode("SSLClientServiceTest"); + ssl_client_cont->enable(); + assert(ssl_client_cont != nullptr); + assert(ssl_client_cont->getControllerServiceImplementation() != nullptr); + ssl_client = std::static_pointer_cast<minifi::controllers::SSLContextService>(ssl_client_cont->getControllerServiceImplementation()); + } + assert(ssl_client->getCACertificate().length() > 0); + // now let's disable one of the controller services. + std::shared_ptr<core::controller::ControllerServiceNode> cs_id = controller->getControllerServiceNode("ID"); + assert(cs_id != nullptr); + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->disableControllerService(cs_id); + disabled = true; + waitToVerifyProcessor(); + } + { + std::lock_guard<std::mutex> lock(control_mutex); + controller->enableControllerService(cs_id); + disabled = false; + waitToVerifyProcessor(); + } + std::shared_ptr<core::controller::ControllerServiceNode> mock_cont = controller->getControllerServiceNode("MockItLikeIts1995"); + assert(cs_id->enabled()); +{ + std::lock_guard<std::mutex> lock(control_mutex); + controller->disableReferencingServices(mock_cont); + disabled = true; + waitToVerifyProcessor(); + } + assert(cs_id->enabled() == false); +{ + std::lock_guard<std::mutex> lock(control_mutex); + controller->enableReferencingServices(mock_cont); + disabled = false; + waitToVerifyProcessor(); + } + assert(cs_id->enabled() == true); + + controller->waitUnload(60000); + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/GetFileNoData.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/GetFileNoData.cpp b/extensions/civet_curl_tests/GetFileNoData.cpp new file mode 100644 index 0000000..f475f48 --- /dev/null +++ b/extensions/civet_curl_tests/GetFileNoData.cpp @@ -0,0 +1,184 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "c2/C2Agent.h" +#include "CivetServer.h" +#include <cstring> +#include "c2/protocols/RESTSender.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +static std::vector<std::string> responses; + +class ConfigHandler : public CivetHandler { + public: + bool handlePost(CivetServer *server, struct mg_connection *conn) { + if (responses.size() > 0) { + std::string top_str = responses.back(); + responses.pop_back(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + top_str.length()); + mg_printf(conn, "%s", top_str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::ifstream myfile(test_file_location_.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + str.length()); + mg_printf(conn, "%s", str.c_str()); + } else { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); + } + + return true; + } + std::string test_file_location_; +}; + +int main(int argc, char **argv) { + mg_init_library(0); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); + LogTestController::getInstance().setDebug<minifi::c2::C2Agent>(); + + const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 }; + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + + CivetServer server(cpp_options); + ConfigHandler h_ex; + server.addHandler("/update", h_ex); + std::string key_dir, test_file_location; + if (argc > 1) { + h_ex.test_file_location_ = test_file_location = argv[1]; + key_dir = argv[2]; + } + std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\"" + "}]}"; + + responses.push_back(heartbeat_response); + + std::ifstream myfile(test_file_location.c_str()); + + if (myfile.is_open()) { + std::stringstream buffer; + buffer << myfile.rdbuf(); + std::string str = buffer.str(); + myfile.close(); + std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" + "\"operation\" : \"update\", " + "\"operationid\" : \"8675309\", " + "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}"; + responses.push_back(response); + } + + std::shared_ptr<minifi::Configure> configuration = std::make_shared< + minifi::Configure>(); + + configuration->set("c2.rest.url", + "http://localhost:9090/update"); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = + std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared< + TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, + test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared + <minifi::io::StreamFactory>(configuration); + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr + <core::YamlConfiguration + >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast + <TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = + std::make_shared<minifi::FlowController + >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, + configuration, + test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot( + test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup + >(ptr.get()); + ptr.release(); + + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + std::string logs = LogTestController::getInstance().log_output.str(); + assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos); + LogTestController::getInstance().reset(); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/HTTPSiteToSiteTests.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/HTTPSiteToSiteTests.cpp b/extensions/civet_curl_tests/HTTPSiteToSiteTests.cpp new file mode 100644 index 0000000..01d7231 --- /dev/null +++ b/extensions/civet_curl_tests/HTTPSiteToSiteTests.cpp @@ -0,0 +1,262 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "CivetServer.h" +#include "sitetosite/HTTPProtocol.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "io/StreamFactory.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "TestServer.h" +#include "integration/HTTPIntegrationBase.h" +#include "sitetositehttp/HTTPHandlers.h" +#include "client/HTTPStream.h" + +class SiteToSiteTestHarness : public HTTPIntegrationBase { + public: + explicit SiteToSiteTestHarness(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>(); + LogTestController::getInstance().setDebug<minifi::sitetosite::HttpSiteToSiteClient>(); + LogTestController::getInstance().setDebug<minifi::sitetosite::SiteToSiteClient>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>(); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ConfigurableComponent>(); + + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + + configuration->set("nifi.c2.enable", "false"); + configuration->set("nifi.remote.input.http.enabled", "true"); + configuration->set("nifi.remote.input.socket.port", "8082"); + } + + virtual void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +struct test_profile { + test_profile() + : flow_url_broken(false), + transaction_url_broken(false), + empty_transaction_url(false), + no_delete(false), + invalid_checksum(false) { + } + + bool allFalse() const { + return !flow_url_broken && !transaction_url_broken && !empty_transaction_url && !no_delete && !invalid_checksum; + } + // tests for a broken flow file url + bool flow_url_broken; + // transaction url will return incorrect information + bool transaction_url_broken; + // Location will be absent within the + bool empty_transaction_url; + // delete url is not supported. + bool no_delete; + // invalid checksum error + bool invalid_checksum; +}; + +void run_variance(std::string test_file_location, bool isSecure, std::string url, const struct test_profile &profile) { + SiteToSiteTestHarness harness(isSecure); + + SiteToSiteLocationResponder responder(isSecure); + + TransactionResponder transaction_response(url, "471deef6-2a6e-4a7d-912a-81cc17e3a204", true, profile.transaction_url_broken, profile.empty_transaction_url); + + std::string transaction_id = transaction_response.getTransactionId(); + + harness.setKeyDir(""); + + std::string controller_loc = url + "/controller"; + + harness.setUrl(controller_loc, &responder); + + std::string transaction_url = url + "/data-transfer/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions"; + std::string action_url = url + "/site-to-site/input-ports/471deef6-2a6e-4a7d-912a-81cc17e3a204/transactions"; + + std::string transaction_output_url = url + "/data-transfer/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions"; + std::string action_output_url = url + "/site-to-site/output-ports/471deef6-2a6e-4a7d-912a-81cc17e3a203/transactions"; + + harness.setUrl(transaction_url, &transaction_response); + + std::string peer_url = url + "/site-to-site/peers"; + + PeerResponder peer_response(url); + + harness.setUrl(peer_url, &peer_response); + + std::string flow_url = action_url + "/" + transaction_id + "/flow-files"; + + FlowFileResponder flowResponder(true, profile.flow_url_broken, profile.invalid_checksum); + flowResponder.setFlowUrl(flow_url); + auto producedFlows = flowResponder.getFlows(); + + TransactionResponder transaction_response_output(url, "471deef6-2a6e-4a7d-912a-81cc17e3a203", false, profile.transaction_url_broken, profile.empty_transaction_url); + std::string transaction_output_id = transaction_response_output.getTransactionId(); + transaction_response_output.setFeed(producedFlows); + + harness.setUrl(transaction_output_url, &transaction_response_output); + + std::string flow_output_url = action_output_url + "/" + transaction_output_id + "/flow-files"; + + FlowFileResponder flowOutputResponder(false, profile.flow_url_broken, profile.invalid_checksum); + flowOutputResponder.setFlowUrl(flow_output_url); + flowOutputResponder.setFeed(producedFlows); + + harness.setUrl(flow_url, &flowResponder); + harness.setUrl(flow_output_url, &flowOutputResponder); + + if (!profile.no_delete) { + std::string delete_url = transaction_url + "/" + transaction_id; + DeleteTransactionResponder deleteResponse(delete_url, "201 OK", 12); + harness.setUrl(delete_url, &deleteResponse); + + std::string delete_output_url = transaction_output_url + "/" + transaction_output_id; + DeleteTransactionResponder deleteOutputResponse(delete_output_url, "201 OK", producedFlows); + harness.setUrl(delete_output_url, &deleteOutputResponse); + } + + harness.run(test_file_location); + + std::stringstream assertStr; + if (profile.allFalse()) { + assertStr << "Site2Site transaction " << transaction_id << " peer finished transaction"; + assert(LogTestController::getInstance().contains(assertStr.str()) == true); + } else if (profile.empty_transaction_url) { + assert(LogTestController::getInstance().contains("Location is empty") == true); + } else if (profile.transaction_url_broken) { + assert(LogTestController::getInstance().contains("Could not create transaction, intent is ohstuff") == true); + } else if (profile.invalid_checksum) { + assertStr << "Site2Site transaction " << transaction_id << " peer confirm transaction with CRC Imawrongchecksumshortandstout"; + assert(LogTestController::getInstance().contains(assertStr.str()) == true); + assertStr.str(std::string()); + assertStr << "Site2Site transaction " << transaction_id << " CRC not matched"; + assert(LogTestController::getInstance().contains(assertStr.str()) == true); + assertStr.str(std::string()); + assertStr << "Site2Site delete transaction " << transaction_id; + assert(LogTestController::getInstance().contains(assertStr.str()) == true); + } else if (profile.no_delete) { + assert(LogTestController::getInstance().contains("Received 401 response code from delete") == true); + } else { + assertStr << "Site2Site transaction " << transaction_id << " peer unknown respond code 254"; + assert(LogTestController::getInstance().contains(assertStr.str()) == true); + } + LogTestController::getInstance().reset(); +} + +int main(int argc, char **argv) { + transaction_id = 0; + transaction_id_output = 0; + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + url = argv[3]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + { + struct test_profile profile; + run_variance(test_file_location, isSecure, url, profile); + } + + { + struct test_profile profile; + profile.flow_url_broken = true; + run_variance(test_file_location, isSecure, url, profile); + } + + { + struct test_profile profile; + profile.empty_transaction_url = true; + run_variance(test_file_location, isSecure, url, profile); + } + + { + struct test_profile profile; + profile.transaction_url_broken = true; + run_variance(test_file_location, isSecure, url, profile); + } + + { + struct test_profile profile; + profile.no_delete = true; + run_variance(test_file_location, isSecure, url, profile); + } + + { + struct test_profile profile; + profile.invalid_checksum = true; + run_variance(test_file_location, isSecure, url, profile); + } + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/HttpGetIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/HttpGetIntegrationTest.cpp b/extensions/civet_curl_tests/HttpGetIntegrationTest.cpp new file mode 100644 index 0000000..df40497 --- /dev/null +++ b/extensions/civet_curl_tests/HttpGetIntegrationTest.cpp @@ -0,0 +1,162 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestServer.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "processors/InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" + +void waitToVerifyProcessor() { + std::this_thread::sleep_for(std::chrono::seconds(10)); +} + +int log_message(const struct mg_connection *conn, const char *message) { + puts(message); + return 1; +} + +int ssl_enable(void *ssl_context, void *user_data) { + struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; + return 0; +} + +class HttpResponder : public CivetHandler { + public: + bool handleGet(CivetServer *server, struct mg_connection *conn) { + static const std::string site2site_rest_resp = "hi this is a get test"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + site2site_rest_resp.length()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } +}; + +int main(int argc, char **argv) { + init_webserver(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<core::ProcessSession>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<minifi::controllers::SSLContextService>(); + LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); + std::string key_dir, test_file_location; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>(); + configuration->set(minifi::Configure::nifi_default_directory, key_dir); + mkdir("content_repository", S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>(); + std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>(); + + configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location); + + std::shared_ptr<minifi::io::StreamFactory> stream_factory = std::make_shared<minifi::io::StreamFactory>(configuration); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + content_repo->initialize(configuration); + + std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>( + new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location)); + std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo); + + std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), + content_repo, + DEFAULT_ROOT_GROUP_NAME, + true); + + core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location); + + std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location); + std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get()); + std::shared_ptr<core::Processor> proc = ptr->findProcessor("invoke"); + assert(proc != nullptr); + + std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc); + + assert(inv != nullptr); + std::string url = ""; + inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url); + ptr.release(); + HttpResponder h_ex; + std::string port, scheme, path; + CivetServer *server = nullptr; + + parse_http_components(url, port, scheme, path); + struct mg_callbacks callback; + if (url.find("localhost") != std::string::npos) { + if (scheme == "https") { + std::string cert = ""; + cert = key_dir + "nifi-cert.pem"; + memset(&callback, 0, sizeof(callback)); + callback.init_ssl = ssl_enable; + port +="s"; + callback.log_message = log_message; + server = start_webserver(port, path, &h_ex, &callback, cert, cert); + } else { + server = start_webserver(port, path, &h_ex); + } + } + controller->load(); + controller->start(); + waitToVerifyProcessor(); + + controller->waitUnload(60000); + if (url.find("localhost") == std::string::npos) { + stop_webserver(server); + exit(1); + } + std::string logs = LogTestController::getInstance().log_output.str(); + + assert(logs.find("key:filename value:") != std::string::npos); + assert(logs.find("key:invokehttp.request.url value:" + url) != std::string::npos); + assert(logs.find("key:invokehttp.status.code value:200") != std::string::npos); + + LogTestController::getInstance().reset(); + rmdir("./content_repository"); + stop_webserver(server); + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/HttpPostIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/HttpPostIntegrationTest.cpp b/extensions/civet_curl_tests/HttpPostIntegrationTest.cpp new file mode 100644 index 0000000..853fdc6 --- /dev/null +++ b/extensions/civet_curl_tests/HttpPostIntegrationTest.cpp @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" +#include <sstream> +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "TestServer.h" +#include "integration/HTTPIntegrationBase.h" + +class HttpTestHarness : public HTTPIntegrationBase { + public: + HttpTestHarness() { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ProcessGroup>(); + LogTestController::getInstance().setDebug<minifi::SchedulingAgent>(); + LogTestController::getInstance().setDebug<core::ProcessContext>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + configuration->set("nifi.flow.engine.threads", "8"); + configuration->set("nifi.c2.enable", "false"); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("curl performed") == true); + assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true); + assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false); + } + + protected: + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + HttpTestHarness harness; + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/SiteToSiteRestTest.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/SiteToSiteRestTest.cpp b/extensions/civet_curl_tests/SiteToSiteRestTest.cpp new file mode 100644 index 0000000..f235be1 --- /dev/null +++ b/extensions/civet_curl_tests/SiteToSiteRestTest.cpp @@ -0,0 +1,145 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define CURLOPT_SSL_VERIFYPEER_DISABLE 1 +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "TestServer.h" +#include "integration/HTTPIntegrationBase.h" + +class Responder : public CivetHandler { + public: + explicit Responder(bool isSecure) + : isSecure(isSecure) { + } + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = "{" + "\"revision\": {" + "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" + "}," + "\"controller\": {" + "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," + "\"name\": \"NiFi Flow\"," + "\"remoteSiteListeningPort\": 10001," + "\"siteToSiteSecure\": "; + site2site_rest_resp += (isSecure ? "true" : "false"); + site2site_rest_resp += "}}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + site2site_rest_resp.length()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + protected: + bool isSecure; +}; + +class SiteToSiteTestHarness : public HTTPIntegrationBase { + public: + explicit SiteToSiteTestHarness(bool isSecure) + : isSecure(isSecure) { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<minifi::RemoteProcessorGroupPort>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>(); + LogTestController::getInstance().setInfo<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ConfigurableComponent>(); + + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + if (isSecure) { + assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 1") == true); + } else { + assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 0") == true); + } + } + + protected: + bool isSecure; + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + url = argv[3]; + } + + bool isSecure = false; + if (url.find("https") != std::string::npos) { + isSecure = true; + } + + SiteToSiteTestHarness harness(isSecure); + + Responder responder(isSecure); + + harness.setKeyDir(key_dir); + + harness.setUrl(url, &responder); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/ThreadPoolAdjust.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/ThreadPoolAdjust.cpp b/extensions/civet_curl_tests/ThreadPoolAdjust.cpp new file mode 100644 index 0000000..2785117 --- /dev/null +++ b/extensions/civet_curl_tests/ThreadPoolAdjust.cpp @@ -0,0 +1,115 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <sys/stat.h> +#undef NDEBUG +#include <cassert> +#include <utility> +#include <chrono> +#include <fstream> +#include <memory> +#include <string> +#include <thread> +#include <type_traits> +#include <vector> +#include <iostream> +#include <sstream> +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "TestBase.h" +#include "utils/StringUtils.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "core/ProcessGroup.h" +#include "core/yaml/YamlConfiguration.h" +#include "FlowController.h" +#include "properties/Configure.h" +#include "unit/ProvenanceTestHelper.h" +#include "io/StreamFactory.h" +#include "CivetServer.h" +#include "RemoteProcessorGroupPort.h" +#include "core/ConfigurableComponent.h" +#include "controllers/SSLContextService.h" +#include "TestServer.h" +#include "integration/HTTPIntegrationBase.h" +#include "processors/InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" + +class HttpTestHarness : public IntegrationBase { + public: + HttpTestHarness() { + char format[] = "/tmp/ssth.XXXXXX"; + dir = testController.createTempDirectory(format); + } + + void testSetup() { + LogTestController::getInstance().setDebug<minifi::FlowController>(); + LogTestController::getInstance().setDebug<core::ProcessGroup>(); + LogTestController::getInstance().setDebug<minifi::SchedulingAgent>(); + LogTestController::getInstance().setDebug<core::ProcessContext>(); + LogTestController::getInstance().setDebug<processors::InvokeHTTP>(); + LogTestController::getInstance().setDebug<utils::HTTPClient>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::WriteCallback>(); + LogTestController::getInstance().setDebug<processors::ListenHTTP::Handler>(); + LogTestController::getInstance().setDebug<processors::LogAttribute>(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<minifi::ThreadedSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::TimerDrivenSchedulingAgent>(); + LogTestController::getInstance().setDebug<minifi::core::ProcessSession>(); + std::fstream file; + ss << dir << "/" << "tstFile.ext"; + file.open(ss.str(), std::ios::out); + file << "tempFile"; + file.close(); + configuration->set("nifi.flow.engine.threads", "1"); + } + + void cleanup() { + unlink(ss.str().c_str()); + } + + void runAssertions() { + assert(LogTestController::getInstance().contains("curl performed") == true); + assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true); + assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false); + } + + protected: + char *dir; + std::stringstream ss; + TestController testController; +}; + +int main(int argc, char **argv) { + std::string key_dir, test_file_location, url; + if (argc > 1) { + test_file_location = argv[1]; + key_dir = argv[2]; + } + + HttpTestHarness harness; + + harness.setKeyDir(key_dir); + + harness.run(test_file_location); + + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/include/TestServer.h ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/include/TestServer.h b/extensions/civet_curl_tests/include/TestServer.h new file mode 100644 index 0000000..06f996c --- /dev/null +++ b/extensions/civet_curl_tests/include/TestServer.h @@ -0,0 +1,117 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_TEST_TESTSERVER_H_ +#define LIBMINIFI_TEST_TESTSERVER_H_ +#include <regex.h> +#include <string> +#include <iostream> +#include "civetweb.h" +#include "CivetServer.h" + + +/* Server context handle */ +static std::string resp_str; + +void init_webserver() { + mg_init_library(0); +} + + +CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler, struct mg_callbacks *callbacks, std::string &cert, std::string &ca_cert) { + const char *options[] = { "listening_ports", port.c_str(), "error_log_file", + "error.log", "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", + "ALL", "ssl_verify_peer", "no", 0 }; + + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + CivetServer *server = new CivetServer(cpp_options); + + server->addHandler(rooturi, handler); + + return server; + +} + +CivetServer * start_webserver(std::string &port, std::string &rooturi, CivetHandler *handler) { + const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), 0 }; + + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + CivetServer *server = new CivetServer(cpp_options); + + server->addHandler(rooturi, handler); + + return server; + +} + +bool parse_http_components(const std::string &url, std::string &port, std::string &scheme, std::string &path) { + regex_t regex; + + const char *regexstr = "^(http|https)://(localhost:)([0-9]+)?(/.*)$"; + + int ret = regcomp(®ex, regexstr, REG_EXTENDED); + if (ret) { + return false; + } + + size_t potentialGroups = regex.re_nsub + 1; + regmatch_t groups[potentialGroups]; + if (regexec(®ex, url.c_str(), potentialGroups, groups, 0) == 0) { + for (int i = 0; i < potentialGroups; i++) { + if (groups[i].rm_so == -1) + break; + + std::string str(url.data() + groups[i].rm_so, groups[i].rm_eo - groups[i].rm_so); + switch (i) { + case 1: + scheme = str; + break; + case 3: + port = str; + break; + case 4: + path = str; + break; + default: + break; + } + } + } + if (path.empty() || scheme.empty() || port.empty()) + return false; + + regfree(®ex); + + return true; + +} + +static void stop_webserver(CivetServer *server) { + if (server != nullptr) + delete server; + + /* Un-initialize the library */ + mg_exit_library(); +} + +#endif
