http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/include/integration/HTTPIntegrationBase.h ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/include/integration/HTTPIntegrationBase.h b/extensions/civet_curl_tests/include/integration/HTTPIntegrationBase.h new file mode 100644 index 0000000..be80dce --- /dev/null +++ b/extensions/civet_curl_tests/include/integration/HTTPIntegrationBase.h @@ -0,0 +1,75 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ +#define LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ + +#include "TestServer.h" +#include "CivetServer.h" +#include "integration/IntegrationBase.h" + +int log_message(const struct mg_connection *conn, const char *message) { + puts(message); + return 1; +} + +int ssl_enable(void *ssl_context, void *user_data) { + struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; + return 0; +} + +class HTTPIntegrationBase : public IntegrationBase { + public: + HTTPIntegrationBase() : IntegrationBase(), server(nullptr) {} + + void setUrl(std::string url, CivetHandler *handler); + + virtual ~HTTPIntegrationBase(); + + protected: + CivetServer *server; +}; + +HTTPIntegrationBase::~HTTPIntegrationBase() { + stop_webserver(server); +} + +void HTTPIntegrationBase::setUrl(std::string url, CivetHandler *handler) { + + parse_http_components(url, port, scheme, path); + struct mg_callbacks callback; + if (url.find("localhost") != std::string::npos) { + + if (server != nullptr){ + server->addHandler(path,handler); + return; + } + if (scheme == "https" && !key_dir.empty()) { + std::string cert = ""; + cert = key_dir + "nifi-cert.pem"; + memset(&callback, 0, sizeof(callback)); + callback.init_ssl = ssl_enable; + port += "s"; + callback.log_message = log_message; + server = start_webserver(port, path, handler, &callback, cert, cert); + } else { + server = start_webserver(port, path, handler); + } + } +} + +#endif /* LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ */ \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h b/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h new file mode 100644 index 0000000..d188df3 --- /dev/null +++ b/extensions/civet_curl_tests/include/sitetositehttp/HTTPHandlers.h @@ -0,0 +1,320 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "civetweb.h" +#include "CivetServer.h" +#include "concurrentqueue.h" +#include "io/CivetStream.h" +#include "io/CRCStream.h" +#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ +#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ +static std::atomic<int> transaction_id; +static std::atomic<int> transaction_id_output; + +class FlowObj { + public: + FlowObj() + : total_size(0) { + + } + explicit FlowObj(const FlowObj &&other) + : total_size(std::move(other.total_size)), + attributes(std::move(other.attributes)), + data(std::move(other.data)) { + + } + uint64_t total_size; + std::map<std::string, std::string> attributes; + std::vector<uint8_t> data; + +}; + +class SiteToSiteLocationResponder : public CivetHandler { + public: + explicit SiteToSiteLocationResponder(bool isSecure) + : isSecure(isSecure) { + } + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = "{" + "\"revision\": {" + "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" + "}," + "\"controller\": {" + "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," + "\"name\": \"NiFi Flow\"," + "\"siteToSiteSecure\": "; + site2site_rest_resp += (isSecure ? "true" : "false"); + site2site_rest_resp += "}}"; + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + site2site_rest_resp.length()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + protected: + bool isSecure; +}; + +class PeerResponder : public CivetHandler { + public: + + explicit PeerResponder(const std::string base_url) + : base_url(base_url) { + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"localhost\", \"port\": 8082, \"secure\": false, \"flowFileCount\" : 0 }] }"; + std::stringstream headers; + headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; + mg_printf(conn, "%s", headers.str().c_str()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + protected: + std::string base_url; +}; + +class TransactionResponder : public CivetHandler { + public: + + explicit TransactionResponder(const std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri) + : base_url(base_url), + wrong_uri(wrong_uri), + empty_transaction_uri(empty_transaction_uri), + input_port(input_port), + port_id(port_id), + flow_files_feed_(nullptr) { + + if (input_port) { + transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96"; + transaction_id_str += std::to_string(transaction_id.load()); + transaction_id++; + } else { + transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95"; + transaction_id_str += std::to_string(transaction_id_output.load()); + transaction_id_output++; + } + } + + bool handlePost(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = ""; + std::stringstream headers; + headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nx-location-uri-intent: "; + if (wrong_uri) + headers << "ohstuff\r\n"; + else + headers << "transaction-url\r\n"; + + std::string port_type; + + if (input_port) + port_type = "input-ports"; + else + port_type = "output-ports"; + if (!empty_transaction_uri) + headers << "Location: " << base_url << "/site-to-site/" << port_type << "/" << port_id << "/transactions/" << transaction_id_str << "\r\n"; + headers << "Connection: close\r\n\r\n"; + mg_printf(conn, "%s", headers.str().c_str()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { + flow_files_feed_ = feed; + } + + std::string getTransactionId() { + return transaction_id_str; + } + protected: + std::string base_url; + std::string transaction_id_str; + bool wrong_uri; + bool empty_transaction_uri; + bool input_port; + std::string port_id; + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; +}; + +class FlowFileResponder : public CivetHandler { + public: + + explicit FlowFileResponder(bool input_port, bool wrong_uri, bool invalid_checksum) + : wrong_uri(wrong_uri), + input_port(input_port), + invalid_checksum(invalid_checksum), + flow_files_feed_(nullptr) { + } + + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() { + return &flow_files_; + } + + void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { + flow_files_feed_ = feed; + } + + bool handlePost(CivetServer *server, struct mg_connection *conn) { + std::string site2site_rest_resp = ""; + std::stringstream headers; + + if (!wrong_uri) { + minifi::io::CivetStream civet_stream(conn); + minifi::io::CRCStream<minifi::io::CivetStream> stream(&civet_stream); + uint32_t num_attributes; + uint64_t total_size = 0; + total_size += stream.read(num_attributes); + + auto flow = std::make_shared<FlowObj>(); + + for (int i = 0; i < num_attributes; i++) { + std::string name, value; + total_size += stream.readUTF(name, true); + total_size += stream.readUTF(value, true); + flow->attributes[name] = value; + } + uint64_t length; + total_size += stream.read(length); + + total_size += length; + flow->data.resize(length); + flow->total_size = total_size; + + assert(stream.readData(flow->data.data(), length) == length); + + assert(flow->attributes["path"] == "."); + assert(!flow->attributes["uuid"].empty()); + assert(!flow->attributes["filename"].empty()); + + if (!invalid_checksum) { + site2site_rest_resp = std::to_string(stream.getCRC()); + flow_files_.enqueue(flow); + } else { + site2site_rest_resp = "Imawrongchecksumshortandstout"; + } + + headers << "HTTP/1.1 202 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; + } else { + headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n"; + } + + mg_printf(conn, "%s", headers.str().c_str()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + + if (flow_files_feed_->size_approx() > 0) { + std::shared_ptr<FlowObj> flow; + uint8_t buf[1]; + std::vector<std::shared_ptr<FlowObj>> flows; + uint64_t total = 0; + + while (flow_files_feed_->try_dequeue(flow)) { + flows.push_back(flow); + total += flow->total_size; + } + mg_printf(conn, "HTTP/1.1 200 OK\r\n" + "Content-Length: %llu\r\n" + "Content-Type: application/octet-stream\r\n" + "Connection: close\r\n\r\n", + total); + minifi::io::BaseStream serializer; + minifi::io::CRCStream<minifi::io::BaseStream> stream(&serializer); + for (auto flow : flows) { + uint32_t num_attributes = flow->attributes.size(); + stream.write(num_attributes); + for (auto entry : flow->attributes) { + stream.writeUTF(entry.first); + stream.writeUTF(entry.second); + } + uint64_t length = flow->data.size(); + stream.write(length); + stream.writeData(flow->data.data(), length); + } + auto ret = mg_write(conn, serializer.getBuffer(), total); + } else { + std::cout << "Nothing to transfer feed" << std::endl; + mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: " + "close\r\nContent-Length: 0\r\n"); + mg_printf(conn, "Content-Type: text/plain\r\n\r\n"); + + } + return true; + } + + void setFlowUrl(std::string flowUrl) { + base_url = flowUrl; + } + + protected: + // base url + std::string base_url; + // set the wrong url + bool wrong_uri; + // we are running an input port + bool input_port; + // invalid checksum is returned. + bool invalid_checksum; + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_; + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; +}; + +class DeleteTransactionResponder : public CivetHandler { + public: + + explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, int expected_resp_code) + : flow_files_feed_(nullptr), + base_url(base_url), + response_code(response_code) { + expected_resp_code_str = std::to_string(expected_resp_code); + } + + explicit DeleteTransactionResponder(const std::string base_url, std::string response_code, moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) + : flow_files_feed_(feed), + base_url(base_url), + response_code(response_code) { + } + + bool handleDelete(CivetServer *server, struct mg_connection *conn) { + + std::string site2site_rest_resp = ""; + std::stringstream headers; + std::string resp; + CivetServer::getParam(conn, "responseCode", resp); + headers << "HTTP/1.1 " << response_code << "\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\n"; + headers << "Connection: close\r\n\r\n"; + mg_printf(conn, "%s", headers.str().c_str()); + mg_printf(conn, "%s", site2site_rest_resp.c_str()); + return true; + } + + void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { + flow_files_feed_ = feed; + } + + protected: + moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; + std::string base_url; + std::string expected_resp_code_str; + std::string response_code; +}; + +#endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/unit/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/unit/CMakeLists.txt b/extensions/civet_curl_tests/unit/CMakeLists.txt new file mode 100644 index 0000000..b645da1 --- /dev/null +++ b/extensions/civet_curl_tests/unit/CMakeLists.txt @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +file(GLOB CURL_UNIT_TESTS "unit/*.cpp") +file(GLOB CURL_INTEGRATION_TESTS "*.cpp") + +SET(CURL_INT_TEST_COUNT 0) + +FOREACH(testfile ${CURL_UNIT_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS}) + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/client/") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/processors/") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/protocols/") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/sitetosite/") + target_link_libraries(${testfilename} ${CURL_LIBRARIES} ) + createTests("${testfilename}") + target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) + if (APPLE) + target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl ) + else () + target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive) + endif() + MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") +ENDFOREACH() + +FOREACH(testfile ${CURL_INTEGRATION_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} BEFORE PRIVATE ${CURL_INCLUDE_DIRS}) + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/client/") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/processors/") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/protocols/") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/sitetosite/") + target_link_libraries(${testfilename} ${CURL_LIBRARIES} ) + createTests("${testfilename}") + if (APPLE) + target_link_libraries ("${testfilename}" -Wl,-all_load minifi-http-curl ) + else () + target_link_libraries ("${testfilename}" -Wl,--whole-archive minifi-http-curl -Wl,--no-whole-archive) + endif() + MATH(EXPR CURL_INT_TEST_COUNT "${CURL_INT_TEST_COUNT}+1") +ENDFOREACH() + +message("-- Finished building ${CURL_INT_TEST_COUNT} libcURL integration test file(s)...") + +add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/") +add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpPostIntegrationTest COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPost.yml" "${TEST_RESOURCES}/") +add_test(NAME HttpPostIntegrationTestChunked COMMAND HttpPostIntegrationTest "${TEST_RESOURCES}/TestHTTPPostChunkedEncoding.yml" "${TEST_RESOURCES}/") +add_test(NAME C2VerifyServeResults COMMAND C2VerifyServeResults "${TEST_RESOURCES}/C2VerifyServeResults.yml" "${TEST_RESOURCES}/") +add_test(NAME C2VerifyHeartbeatAndStop COMMAND C2VerifyHeartbeatAndStop "${TEST_RESOURCES}/C2VerifyHeartbeatAndStop.yml" "${TEST_RESOURCES}/") +add_test(NAME SiteToSiteRestTest COMMAND SiteToSiteRestTest "${TEST_RESOURCES}/TestSite2SiteRest.yml" "${TEST_RESOURCES}/" "http://localhost:8077/nifi-api/site-to-site") +add_test(NAME ControllerServiceIntegrationTests COMMAND ControllerServiceIntegrationTests "${TEST_RESOURCES}/TestControllerServices.yml" "${TEST_RESOURCES}/") +add_test(NAME ThreadPoolAdjust COMMAND ThreadPoolAdjust "${TEST_RESOURCES}/ThreadPoolAdjust.yml" "${TEST_RESOURCES}/") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp ---------------------------------------------------------------------- diff --git a/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp b/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp new file mode 100644 index 0000000..81d2714 --- /dev/null +++ b/extensions/civet_curl_tests/unit/InvokeHTTPTests.cpp @@ -0,0 +1,315 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <uuid/uuid.h> +#include <fstream> +#include <map> +#include <memory> +#include <utility> +#include <string> +#include <set> +#include "FlowController.h" +#include "io/BaseStream.h" +#include "TestBase.h" +#include "processors/GetFile.h" +#include "core/Core.h" +#include "HTTPClient.h" +#include "InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "core/FlowFile.h" +#include "unit/ProvenanceTestHelper.h" +#include "core/Processor.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessorNode.h" +#include "processors/InvokeHTTP.h" +#include "processors/ListenHTTP.h" +#include "processors/LogAttribute.h" + +TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") { + TestController testController; + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + + std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); + + std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); + uuid_t processoruuid; + REQUIRE(true == listenhttp->getUUID(processoruuid)); + + uuid_t invokehttp_uuid; + REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); + + std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); + gcConnection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); + laConnection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); + connection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp"); + + connection2->setRelationship(core::Relationship("No Retry", "description")); + + // link the connections so that we can test results at the end for this + connection->setSource(listenhttp); + + connection2->setSourceUUID(invokehttp_uuid); + connection->setSourceUUID(processoruuid); + connection->setDestinationUUID(invokehttp_uuid); + + listenhttp->addConnection(connection); + invokehttp->addConnection(connection); + invokehttp->addConnection(connection2); + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp); + std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8686"); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); + + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8686/testytesttest"); + auto session = std::make_shared<core::ProcessSession>(context); + auto session2 = std::make_shared<core::ProcessSession>(context2); + + REQUIRE(listenhttp->getName() == "listenhttp"); + + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); + + std::shared_ptr<core::FlowFile> record; + listenhttp->setScheduledState(core::ScheduledState::RUNNING); + listenhttp->onSchedule(context, factory); + listenhttp->onTrigger(context, session); + + invokehttp->incrementActiveTasks(); + invokehttp->setScheduledState(core::ScheduledState::RUNNING); + std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); + invokehttp->onSchedule(context2, factory2); + invokehttp->onTrigger(context2, session2); + + provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); + record = session->get(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + listenhttp->incrementActiveTasks(); + listenhttp->setScheduledState(core::ScheduledState::RUNNING); + listenhttp->onTrigger(context, session); + + reporter = session->getProvenanceReporter(); + + records = reporter->getEvents(); + session->commit(); + + invokehttp->incrementActiveTasks(); + invokehttp->setScheduledState(core::ScheduledState::RUNNING); + invokehttp->onTrigger(context2, session2); + + session2->commit(); + records = reporter->getEvents(); + + for (provenance::ProvenanceEventRecord *provEventRecord : records) { + REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); + } + std::shared_ptr<core::FlowFile> ffr = session2->get(); + REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); + LogTestController::getInstance().reset(); +} + +class CallBack : public minifi::OutputStreamCallback { + public: + CallBack() { + } + virtual ~CallBack() { + } + virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) { + // leaving the typo for posterity sake + std::string st = "we're gnna write some test stuff"; + return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), st.length()); + } +}; + +TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") { + TestController testController; + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); + + std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>(); + + std::shared_ptr<core::Processor> getfileprocessor = std::make_shared<org::apache::nifi::minifi::processors::GetFile>("getfileCreate2"); + + std::shared_ptr<core::Processor> logAttribute = std::make_shared<org::apache::nifi::minifi::processors::LogAttribute>("logattribute"); + + char format[] = "/tmp/gt.XXXXXX"; + char *dir = testController.createTempDirectory(format); + + std::shared_ptr<core::Processor> listenhttp = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp"); + + std::shared_ptr<core::Processor> invokehttp = std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp"); + uuid_t processoruuid; + REQUIRE(true == listenhttp->getUUID(processoruuid)); + + uuid_t invokehttp_uuid; + REQUIRE(true == invokehttp->getUUID(invokehttp_uuid)); + + std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>(); + + std::shared_ptr<minifi::Connection> gcConnection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); + gcConnection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> laConnection = std::make_shared<minifi::Connection>(repo, content_repo, "logattribute"); + laConnection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> connection = std::make_shared<minifi::Connection>(repo, content_repo, "getfileCreate2Connection"); + connection->setRelationship(core::Relationship("success", "description")); + + std::shared_ptr<minifi::Connection> connection2 = std::make_shared<minifi::Connection>(repo, content_repo, "listenhttp"); + + connection2->setRelationship(core::Relationship("No Retry", "description")); + + // link the connections so that we can test results at the end for this + connection->setSource(listenhttp); + + connection->setSourceUUID(invokehttp_uuid); + connection->setDestinationUUID(processoruuid); + + connection2->setSourceUUID(processoruuid); + connection2->setSourceUUID(processoruuid); + + listenhttp->addConnection(connection); + invokehttp->addConnection(connection); + invokehttp->addConnection(connection2); + + + std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(listenhttp); + std::shared_ptr<core::ProcessorNode> node2 = std::make_shared<core::ProcessorNode>(invokehttp); + std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr; + std::shared_ptr<core::ProcessContext> context = std::make_shared<core::ProcessContext>(node, controller_services_provider, repo, repo, content_repo); + std::shared_ptr<core::ProcessContext> context2 = std::make_shared<core::ProcessContext>(node2, controller_services_provider, repo, repo, content_repo); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port, "8680"); + context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "/testytesttest"); + + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST"); + context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL, "http://localhost:8680/testytesttest"); + auto session = std::make_shared<core::ProcessSession>(context); + auto session2 = std::make_shared<core::ProcessSession>(context2); + + REQUIRE(listenhttp->getName() == "listenhttp"); + + std::shared_ptr<core::ProcessSessionFactory> factory = std::make_shared<core::ProcessSessionFactory>(context); + + std::shared_ptr<core::FlowFile> record; + + CallBack callback; + + std::map<std::string, std::string> attributes; + attributes["testy"] = "test"; + std::shared_ptr<minifi::FlowFileRecord> flow = std::make_shared<minifi::FlowFileRecord>(repo, content_repo, attributes); + session2->write(flow, &callback); + + invokehttp->incrementActiveTasks(); + invokehttp->setScheduledState(core::ScheduledState::RUNNING); + std::shared_ptr<core::ProcessSessionFactory> factory2 = std::make_shared<core::ProcessSessionFactory>(context2); + invokehttp->onSchedule(context2, factory2); + invokehttp->onTrigger(context2, session2); + + listenhttp->incrementActiveTasks(); + listenhttp->setScheduledState(core::ScheduledState::RUNNING); + listenhttp->onSchedule(context, factory); + listenhttp->onTrigger(context, session); + + provenance::ProvenanceReporter *reporter = session->getProvenanceReporter(); + std::set<provenance::ProvenanceEventRecord*> records = reporter->getEvents(); + record = session->get(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + listenhttp->incrementActiveTasks(); + listenhttp->setScheduledState(core::ScheduledState::RUNNING); + listenhttp->onTrigger(context, session); + + reporter = session->getProvenanceReporter(); + + records = reporter->getEvents(); + session->commit(); + + invokehttp->incrementActiveTasks(); + invokehttp->setScheduledState(core::ScheduledState::RUNNING); + invokehttp->onTrigger(context2, session2); + + session2->commit(); + records = reporter->getEvents(); + + for (provenance::ProvenanceEventRecord *provEventRecord : records) { + REQUIRE(provEventRecord->getComponentType() == listenhttp->getName()); + } + std::shared_ptr<core::FlowFile> ffr = session2->get(); + REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); + LogTestController::getInstance().reset(); +} + +TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") { + TestController testController; + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::InvokeHTTP>(); + LogTestController::getInstance().setInfo<org::apache::nifi::minifi::processors::ListenHTTP>(); + LogTestController::getInstance().setInfo<core::Processor>(); + + std::shared_ptr<TestPlan> plan = testController.createPlan(); + std::shared_ptr<core::Processor> processor = plan->addProcessor("ListenHTTP", "listenhttp", core::Relationship("No Retry", "description"), false); + std::shared_ptr<core::Processor> invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp", core::Relationship("success", "description"), true); + + REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::Port.getName(), "8685")); + REQUIRE(true == plan->setProperty(processor, org::apache::nifi::minifi::processors::ListenHTTP::BasePath.getName(), "/testytesttest")); + + REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::Method.getName(), "POST")); + REQUIRE(true == plan->setProperty(invokehttp, org::apache::nifi::minifi::processors::InvokeHTTP::URL.getName(), "http://localhost:8685/testytesttest")); + plan->reset(); + testController.runSession(plan, true); + + std::set<provenance::ProvenanceEventRecord*> records = plan->getProvenanceRecords(); + std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile(); + REQUIRE(record == nullptr); + REQUIRE(records.size() == 0); + + plan->reset(); + testController.runSession(plan, true); + + records = plan->getProvenanceRecords(); + record = plan->getCurrentFlowFile(); + + for (provenance::ProvenanceEventRecord *provEventRecord : records) { + REQUIRE(provEventRecord->getComponentType() == processor->getName()); + } + std::shared_ptr<core::FlowFile> ffr = plan->getCurrentFlowFile(); + REQUIRE(true == LogTestController::getInstance().contains("exiting because method is POST")); + LogTestController::getInstance().reset(); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civetweb/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/civetweb/CMakeLists.txt b/extensions/civetweb/CMakeLists.txt new file mode 100644 index 0000000..d31baad --- /dev/null +++ b/extensions/civetweb/CMakeLists.txt @@ -0,0 +1,70 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set(CMAKE_EXE_LINKER_FLAGS "-Wl,--export-all-symbols") +set(CMAKE_SHARED_LINKER_FLAGS "-Wl,--export-symbols") + +include_directories(${CMAKE_SOURCE_DIR}/libminifi/include + ${CMAKE_SOURCE_DIR}/libminifi/include/core + ${CMAKE_SOURCE_DIR}/thirdparty/spdlog-20170710/include + ${CMAKE_SOURCE_DIR}/thirdparty/concurrentqueue + ${CMAKE_SOURCE_DIR}/thirdparty/yaml-cpp-yaml-cpp-0.5.3/include + ${CIVET_THIRDPARTY_ROOT}/include + ${CMAKE_SOURCE_DIR}/thirdparty/jsoncpp/include + ${CMAKE_SOURCE_DIR}/thirdparty/ + ./include) + +set(BUILD_CIVET_TESTING OFF) +add_subdirectory(${CIVET_THIRDPARTY_ROOT} + ${CIVET_BINARY_ROOT} + EXCLUDE_FROM_ALL) + +file(GLOB SOURCES "*.cpp") + +add_library(minifi-civet-extensions STATIC ${SOURCES}) +set_property(TARGET minifi-civet-extensions PROPERTY POSITION_INDEPENDENT_CODE ON) +target_link_libraries(minifi-civet-extensions c-library civetweb-cpp) + +if(THREADS_HAVE_PTHREAD_ARG) + target_compile_options(PUBLIC minifi-civet-extensions "-pthread") +endif() +if(CMAKE_THREAD_LIBS_INIT) + target_link_libraries(minifi-civet-extensions "${CMAKE_THREAD_LIBS_INIT}") +endif() + + +if (WIN32) + set_target_properties(minifi-civet-extensions PROPERTIES + LINK_FLAGS "/WHOLEARCHIVE" + ) +elseif (APPLE) + set_target_properties(minifi-civet-extensions PROPERTIES + LINK_FLAGS "-Wl,-all_load" + ) +else () + set_target_properties(minifi-civet-extensions PROPERTIES + LINK_FLAGS "-Wl,--whole-archive" + ) +endif () + + +SET (civet-EXTENSIONS minifi-civet-extensions PARENT_SCOPE) + +register_extension(minifi-civet-extensions) + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civetweb/CivetLoader.cpp ---------------------------------------------------------------------- diff --git a/extensions/civetweb/CivetLoader.cpp b/extensions/civetweb/CivetLoader.cpp new file mode 100644 index 0000000..c593bb7 --- /dev/null +++ b/extensions/civetweb/CivetLoader.cpp @@ -0,0 +1,29 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "CivetLoader.h" +#include "core/FlowConfiguration.h" + +bool CivetFactory::added = core::FlowConfiguration::add_static_func("createCivetFactory"); + +extern "C" { + +void *createCivetFactory(void) { + return new CivetFactory(); +} + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civetweb/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/extensions/civetweb/ListenHTTP.cpp b/extensions/civetweb/ListenHTTP.cpp new file mode 100644 index 0000000..62f8194 --- /dev/null +++ b/extensions/civetweb/ListenHTTP.cpp @@ -0,0 +1,333 @@ +/** + * @file ListenHTTP.cpp + + * ListenHTTP class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "processors/ListenHTTP.h" +#include <uuid/uuid.h> +#include <CivetServer.h> +#include <stdio.h> +#include <sstream> +#include <utility> +#include <memory> +#include <string> +#include <iostream> +#include <fstream> +#include <set> +#include <vector> +#include "utils/TimeUtil.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/ProcessSessionFactory.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener"); +core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", ""); +core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming" + " connections. If the Pattern does not match the DN, the connection will be refused.", + ".*"); +core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", ""); +core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", ""); +core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no"); +core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2"); +core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that" + " should be passed along as FlowFile attributes", + ""); + +core::Relationship ListenHTTP::Success("success", "All files are routed to success"); + +void ListenHTTP::initialize() { + logger_->log_info("Initializing ListenHTTP"); + + // Set the supported properties + std::set<core::Property> properties; + properties.insert(BasePath); + properties.insert(Port); + properties.insert(AuthorizedDNPattern); + properties.insert(SSLCertificate); + properties.insert(SSLCertificateAuthority); + properties.insert(SSLVerifyPeer); + properties.insert(SSLMinimumVersion); + properties.insert(HeadersAsAttributesRegex); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string basePath; + + if (!context->getProperty(BasePath.getName(), basePath)) { + logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName().c_str(), BasePath.getValue().c_str()); + basePath = BasePath.getValue(); + } + + basePath.insert(0, "/"); + + std::string listeningPort; + + if (!context->getProperty(Port.getName(), listeningPort)) { + logger_->log_error("%s attribute is missing or invalid", Port.getName().c_str()); + return; + } + + std::string authDNPattern; + + if (context->getProperty(AuthorizedDNPattern.getName(), authDNPattern) && !authDNPattern.empty()) { + logger_->log_info("ListenHTTP using %s: %s", AuthorizedDNPattern.getName().c_str(), authDNPattern.c_str()); + } + + std::string sslCertFile; + + if (context->getProperty(SSLCertificate.getName(), sslCertFile) && !sslCertFile.empty()) { + logger_->log_info("ListenHTTP using %s: %s", SSLCertificate.getName().c_str(), sslCertFile.c_str()); + } + + // Read further TLS/SSL options only if TLS/SSL usage is implied by virtue of certificate value being set + std::string sslCertAuthorityFile; + std::string sslVerifyPeer; + std::string sslMinVer; + + if (!sslCertFile.empty()) { + if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) { + logger_->log_info("ListenHTTP using %s: %s", SSLCertificateAuthority.getName().c_str(), sslCertAuthorityFile.c_str()); + } + + if (context->getProperty(SSLVerifyPeer.getName(), sslVerifyPeer)) { + if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { + logger_->log_info("ListenHTTP will not verify peers"); + } else { + logger_->log_info("ListenHTTP will verify peers"); + } + } else { + logger_->log_info("ListenHTTP will not verify peers"); + } + + if (context->getProperty(SSLMinimumVersion.getName(), sslMinVer)) { + logger_->log_info("ListenHTTP using %s: %s", SSLMinimumVersion.getName().c_str(), sslMinVer.c_str()); + } + } + + std::string headersAsAttributesPattern; + + if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) { + logger_->log_info("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName().c_str(), headersAsAttributesPattern.c_str()); + } + + auto numThreads = getMaxConcurrentTasks(); + + logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort.c_str(), basePath.c_str(), numThreads); + + // Initialize web server + std::vector<std::string> options; + options.push_back("enable_keep_alive"); + options.push_back("yes"); + options.push_back("keep_alive_timeout_ms"); + options.push_back("15000"); + options.push_back("num_threads"); + options.push_back(std::to_string(numThreads)); + + if (sslCertFile.empty()) { + options.push_back("listening_ports"); + options.push_back(listeningPort); + } else { + listeningPort += "s"; + options.push_back("listening_ports"); + options.push_back(listeningPort); + + options.push_back("ssl_certificate"); + options.push_back(sslCertFile); + + if (!sslCertAuthorityFile.empty()) { + options.push_back("ssl_ca_file"); + options.push_back(sslCertAuthorityFile); + } + + if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { + options.push_back("ssl_verify_peer"); + options.push_back("no"); + } else { + options.push_back("ssl_verify_peer"); + options.push_back("yes"); + } + + if (sslMinVer.compare("SSL2") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(0)); + } else if (sslMinVer.compare("SSL3") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(1)); + } else if (sslMinVer.compare("TLS1.0") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(2)); + } else if (sslMinVer.compare("TLS1.1") == 0) { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(3)); + } else { + options.push_back("ssl_protocol_version"); + options.push_back(std::to_string(4)); + } + } + + _server.reset(new CivetServer(options)); + _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern))); + _server->addHandler(basePath, _handler.get()); +} + +ListenHTTP::~ListenHTTP() { +} + +void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); + + // Do nothing if there are no incoming files + if (!flowFile) { + return; + } +} + +ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern) + : _authDNRegex(std::move(authDNPattern)), + _headersAsAttributesRegex(std::move(headersAsAttributesPattern)), + logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) { + _processContext = context; + _processSessionFactory = sessionFactory; +} + +void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) { + mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n" + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); +} + +bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) { + auto req_info = mg_get_request_info(conn); + logger_->log_info("ListenHTTP handling POST request of length %ll", req_info->content_length); + + // If this is a two-way TLS connection, authorize the peer against the configured pattern + if (req_info->is_ssl && req_info->client_cert != nullptr) { + if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) { + mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n" + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); + logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject); + return true; + } + } + + // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html) + mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n"); + + auto session = _processSessionFactory->createSession(); + ListenHTTP::WriteCallback callback(conn, req_info); + auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + + if (!flowFile) { + sendErrorResponse(conn); + return true; + } + + try { + session->write(flowFile, &callback); + + // Add filename from "filename" header value (and pattern headers) + for (int i = 0; i < req_info->num_headers; i++) { + auto header = &req_info->http_headers[i]; + + if (strcmp("filename", header->name) == 0) { + if (!flowFile->updateAttribute("filename", header->value)) { + flowFile->addAttribute("filename", header->value); + } + } else if (std::regex_match(header->name, _headersAsAttributesRegex)) { + if (!flowFile->updateAttribute(header->name, header->value)) { + flowFile->addAttribute(header->name, header->value); + } + } + } + + session->transfer(flowFile, Success); + session->commit(); + } catch (std::exception &exception) { + logger_->log_debug("ListenHTTP Caught Exception %s", exception.what()); + sendErrorResponse(conn); + session->rollback(); + throw; + } catch (...) { + logger_->log_debug("ListenHTTP Caught Exception Processor::onTrigger"); + sendErrorResponse(conn); + session->rollback(); + throw; + } + + mg_printf(conn, "HTTP/1.1 200 OK\r\n" + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); + + return true; +} + +ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) + : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) { + _conn = conn; + _reqInfo = reqInfo; +} + +int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) { + int64_t rlen; + int64_t nlen = 0; + int64_t tlen = _reqInfo->content_length; + uint8_t buf[16384]; + + // if we have no content length we should call mg_read until + // there is no data left from the stream to be HTTP/1.1 compliant + while (tlen == -1 || nlen < tlen) { + rlen = tlen == -1 ? sizeof(buf) : tlen - nlen; + + if (rlen > (int64_t)sizeof(buf)) { + rlen = (int64_t)sizeof(buf); + } + + // Read a buffer of data from client + rlen = mg_read(_conn, &buf[0], (size_t) rlen); + + if (rlen <= 0) { + break; + } + + // Transfer buffer data to the output stream + stream->write(&buf[0], rlen); + + nlen += rlen; + } + + return nlen; +} + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civetweb/RESTReceiver.cpp ---------------------------------------------------------------------- diff --git a/extensions/civetweb/RESTReceiver.cpp b/extensions/civetweb/RESTReceiver.cpp new file mode 100644 index 0000000..1f015ad --- /dev/null +++ b/extensions/civetweb/RESTReceiver.cpp @@ -0,0 +1,147 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/protocols/RESTReceiver.h" +#include <algorithm> +#include <memory> +#include <utility> +#include <map> +#include <string> +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +int log_message(const struct mg_connection *conn, const char *message) { + puts(message); + return 1; +} + +int ssl_protocol_en(void *ssl_context, void *user_data) { + return 0; +} + +RESTReceiver::RESTReceiver(std::string name, uuid_t uuid) + : HeartBeatReporter(name, uuid), + logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) { +} + +void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { + HeartBeatReporter::initialize(controller, configure); + logger_->log_debug("Initializing rest receiveer"); + if (nullptr != configuration_) { + std::string listeningPort, rootUri, caCert; + configuration_->get("c2.rest.listener.port", listeningPort); + configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri); + configuration_->get("c2.rest.listener.cacert", caCert); + + if (!listeningPort.empty() && !rootUri.empty()) { + handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol()); + if (!caCert.empty()) { + listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert); + } else { + listener = start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get())); + } + } + } +} +int16_t RESTReceiver::heartbeat(const C2Payload &payload) { + std::string operation_request_str = getOperation(payload); + std::string outputConfig; + Json::Value json_payload; + json_payload["operation"] = operation_request_str; + if (payload.getIdentifier().length() > 0) { + json_payload["operationid"] = payload.getIdentifier(); + } + const std::vector<C2ContentResponse> &content = payload.getContent(); + + for (const auto &payload_content : content) { + Json::Value payload_content_values; + bool use_sub_option = true; + if (payload_content.op == payload.getOperation()) { + for (auto content : payload_content.operation_arguments) { + if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { + json_payload[payload_content.name] = content.second; + use_sub_option = false; + } else { + payload_content_values[content.first] = content.second; + } + } + } + if (use_sub_option) + json_payload[payload_content.name] = payload_content_values; + } + + for (const auto &nested_payload : payload.getNestedPayloads()) { + json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); + } + + Json::StyledWriter writer; + outputConfig = writer.write(json_payload); + if (handler != nullptr) { + logger_->log_debug("Setting %s", outputConfig); + handler->setResponse(outputConfig); + } + + return 0; +} + +std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) { + struct mg_callbacks callback; + + memset(&callback, 0, sizeof(callback)); + callback.init_ssl = ssl_protocol_en; + std::string my_port = port; + my_port += "s"; + callback.log_message = log_message; + const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL", + "ssl_verify_peer", "no", "num_threads", "1", 0 }; + + std::vector<std::string> cpp_options; + for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); + + server->addHandler(rooturi, handler); + + return server; +} + +std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) { + const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 }; + + std::vector<std::string> cpp_options; + for (uint32_t i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); + + server->addHandler(rooturi, handler); + + return server; +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civetweb/include/CivetLoader.h ---------------------------------------------------------------------- diff --git a/extensions/civetweb/include/CivetLoader.h b/extensions/civetweb/include/CivetLoader.h new file mode 100644 index 0000000..f571b5d --- /dev/null +++ b/extensions/civetweb/include/CivetLoader.h @@ -0,0 +1,70 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSION_CIVETLOADER_H +#define EXTENSION_CIVETLOADER_H + +#include "core/ClassLoader.h" +#include "c2/protocols/RESTReceiver.h" +#include "processors/ListenHTTP.h" + +class __attribute__((visibility("default"))) CivetFactory : public core::ObjectFactory { + public: + CivetFactory() { + + } + + /** + * Gets the name of the object. + * @return class name of processor + */ + virtual std::string getName() { + return "CivetFactory"; + } + + virtual std::string getClassName() { + return "CivetFactory"; + } + /** + * Gets the class name for the object + * @return class name for the processor. + */ + virtual std::vector<std::string> getClassNames() { + std::vector<std::string> class_names; + class_names.push_back("RESTReceiver"); + class_names.push_back("ListenHTTP"); + return class_names; + } + + virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) { + if (utils::StringUtils::equalsIgnoreCase(class_name, "RESTReceiver")) { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTReceiver>()); + } else if (utils::StringUtils::equalsIgnoreCase(class_name, "ListenHTTP")) { + return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<processors::ListenHTTP>()); + } else { + return nullptr; + } + } + + static bool added; + +}; + +extern "C" { +void *createCivetFactory(void); +} +#endif /* EXTENSION_CIVETLOADER_H */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civetweb/include/c2/protocols/RESTReceiver.h ---------------------------------------------------------------------- diff --git a/extensions/civetweb/include/c2/protocols/RESTReceiver.h b/extensions/civetweb/include/c2/protocols/RESTReceiver.h new file mode 100644 index 0000000..4793ee3 --- /dev/null +++ b/extensions/civetweb/include/c2/protocols/RESTReceiver.h @@ -0,0 +1,110 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ +#define LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ + +#include "json/json.h" +#include "json/writer.h" +#include <string> +#include <mutex> +#include "core/Resource.h" +#include "c2/protocols/RESTProtocol.h" +#include "CivetServer.h" +#include "c2/C2Protocol.h" +#include "controllers/SSLContextService.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +int log_message(const struct mg_connection *conn, const char *message); + +int ssl_protocol_en(void *ssl_context, void *user_data); + +/** + * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. + * + * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST + * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction + * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. + * + */ +class RESTReceiver : public RESTProtocol, public HeartBeatReporter { + public: + RESTReceiver(std::string name, uuid_t uuid = nullptr); + + virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override; + virtual int16_t heartbeat(const C2Payload &heartbeat) override; + + protected: + + class ListeningProtocol : public CivetHandler { + + public: + ListeningProtocol() { + + } + + bool handleGet(CivetServer *server, struct mg_connection *conn) { + std::string currentvalue; + { + std::lock_guard<std::mutex> lock(reponse_mutex_); + currentvalue = resp_; + } + + std::stringstream output; + output << "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: " << currentvalue.length() << "\r\nConnection: close\r\n\r\n"; + + mg_printf(conn, "%s", output.str().c_str()); + mg_printf(conn, "%s", currentvalue.c_str()); + return true; + } + + void setResponse(std::string response) { + std::lock_guard<std::mutex> lock(reponse_mutex_); + resp_ = response; + } + + protected: + std::mutex reponse_mutex_; + std::string resp_; + + }; + + std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert); + + std::unique_ptr<CivetServer> start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler); + + std::unique_ptr<CivetServer> listener; + std::unique_ptr<ListeningProtocol> handler; + + private: + std::shared_ptr<logging::Logger> logger_; +}; + +REGISTER_RESOURCE(RESTReceiver); + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_RESTRCVR_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civetweb/include/io/CivetStream.h ---------------------------------------------------------------------- diff --git a/extensions/civetweb/include/io/CivetStream.h b/extensions/civetweb/include/io/CivetStream.h new file mode 100644 index 0000000..571b0ca --- /dev/null +++ b/extensions/civetweb/include/io/CivetStream.h @@ -0,0 +1,138 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ +#define EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ + +#include <memory> +#include <thread> +#include <mutex> +#include <future> +#include <vector> + +#include "io/BaseStream.h" +#include "civetweb.h" +#include "CivetServer.h" +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +class CivetStream : public io::BaseStream { + public: + /** + * File Stream constructor that accepts an fstream shared pointer. + * It must already be initialized for read and write. + */ + explicit CivetStream(struct mg_connection *conn) + : io::BaseStream(), conn(conn) { + + } + + virtual ~CivetStream() { + } + /** + * Skip to the specified offset. + * @param offset offset to which we will skip + */ + void seek(uint64_t offset){ + + } + + const uint64_t getSize() const { + return BaseStream::readBuffer; + } + + // data stream extensions + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen) { + if (buf.capacity() < buflen) { + buf.resize(buflen); + } + int ret = readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen); + + if (ret < buflen) { + buf.resize(ret); + } + return ret; + } + + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + */ + virtual int readData(uint8_t *buf, int buflen) { + return mg_read(conn,buf,buflen); + } + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + virtual int writeData(std::vector<uint8_t> &buf, int buflen) { + return 0; + } + + /** + * writes value to stream + * @param value value to write + * @param size size of value + */ + virtual int writeData(uint8_t *value, int size) { + return 0; + } + + protected: + + /** + * Creates a vector and returns the vector using the provided + * type name. + * @param t incoming object + * @returns vector. + */ + template<typename T> + inline std::vector<uint8_t> readBuffer(const T& t) { + std::vector<uint8_t> buf; + buf.resize(sizeof t); + readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t)); + return buf; + } + + void reset(); + + //size_t pos; + struct mg_connection *conn; + + private: + + std::shared_ptr<logging::Logger> logger_; +}; +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* EXTENSIONS_HTTP_CURL_CLIENT_CIVETSTREAM_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/civetweb/include/processors/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/extensions/civetweb/include/processors/ListenHTTP.h b/extensions/civetweb/include/processors/ListenHTTP.h new file mode 100644 index 0000000..1b58dcd --- /dev/null +++ b/extensions/civetweb/include/processors/ListenHTTP.h @@ -0,0 +1,121 @@ +/** + * @file ListenHTTP.h + * ListenHTTP class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LISTEN_HTTP_H__ +#define __LISTEN_HTTP_H__ + +#include <memory> +#include <regex> + +#include <CivetServer.h> + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// ListenHTTP Class +class ListenHTTP : public core::Processor { + public: + + // Constructor + /*! + * Create a new processor + */ + ListenHTTP(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid), + logger_(logging::LoggerFactory<ListenHTTP>::getLogger()) { + } + // Destructor + virtual ~ListenHTTP(); + // Processor Name + static constexpr char const* ProcessorName = "ListenHTTP"; + // Supported Properties + static core::Property BasePath; + static core::Property Port; + static core::Property AuthorizedDNPattern; + static core::Property SSLCertificate; + static core::Property SSLCertificateAuthority; + static core::Property SSLVerifyPeer; + static core::Property SSLMinimumVersion; + static core::Property HeadersAsAttributesRegex; + // Supported Relationships + static core::Relationship Success; + + void onTrigger(core::ProcessContext *context, core::ProcessSession *session); + void initialize(); + void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + + // HTTP request handler + class Handler : public CivetHandler { + public: + Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern);bool handlePost( + CivetServer *server, struct mg_connection *conn); + + private: + // Send HTTP 500 error response to client + void sendErrorResponse(struct mg_connection *conn); + // Logger + std::shared_ptr<logging::Logger> logger_; + + std::regex _authDNRegex; + std::regex _headersAsAttributesRegex; + core::ProcessContext *_processContext; + core::ProcessSessionFactory *_processSessionFactory; + }; + + // Write callback for transferring data from HTTP request to content repo + class WriteCallback : public OutputStreamCallback { + public: + WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo); + int64_t process(std::shared_ptr<io::BaseStream> stream); + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + + struct mg_connection *_conn; + const struct mg_request_info *_reqInfo; + }; + + private: + // Logger + std::shared_ptr<logging::Logger> logger_; + + std::unique_ptr<CivetServer> _server; + std::unique_ptr<Handler> _handler; +}; + +REGISTER_RESOURCE(ListenHTTP); + +} /* namespace processors */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/extensions/http-curl/CMakeLists.txt b/extensions/http-curl/CMakeLists.txt index e38f3b5..b855e8e 100644 --- a/extensions/http-curl/CMakeLists.txt +++ b/extensions/http-curl/CMakeLists.txt @@ -44,10 +44,12 @@ endif(CURL_FOUND) # Include UUID find_package(UUID REQUIRED) #set(LINK_FLAGS ${LINK_FLAGS} "-Wl,-whole-archive") + # Include OpenSSL find_package(OpenSSL REQUIRED) include_directories(${OPENSSL_INCLUDE_DIR}) -target_link_libraries(minifi-http-curl ${CMAKE_DL_LIBS} ) +target_link_libraries(minifi-http-curl ${CMAKE_DL_LIBS} ${OPENSSL_LIBRARIES}) + find_package(ZLIB REQUIRED) include_directories(${ZLIB_INCLUDE_DIRS}) target_link_libraries (minifi-http-curl ${ZLIB_LIBRARIES}) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/HTTPCurlLoader.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/HTTPCurlLoader.h b/extensions/http-curl/HTTPCurlLoader.h index 797aebf..9a6e821 100644 --- a/extensions/http-curl/HTTPCurlLoader.h +++ b/extensions/http-curl/HTTPCurlLoader.h @@ -18,9 +18,8 @@ #ifndef EXTENSIONS_HTTPCURLLOADER_H_ #define EXTENSIONS_HTTPCURLLOADER_H_ -#include "protocols/RESTProtocol.h" -#include "protocols/RESTSender.h" -#include "protocols/RESTReceiver.h" +#include "c2/protocols/RESTProtocol.h" +#include "c2/protocols/RESTSender.h" #include "processors/InvokeHTTP.h" #include "client/HTTPClient.h" #include "core/ClassLoader.h" @@ -52,7 +51,6 @@ class __attribute__((visibility("default"))) HttpCurlObjectFactory : public core std::vector<std::string> class_names; class_names.push_back("RESTProtocol"); class_names.push_back("HttpProtocol"); - class_names.push_back("RESTReceiver"); class_names.push_back("RESTSender"); class_names.push_back("InvokeHTTP"); class_names.push_back("HTTPClient"); @@ -61,9 +59,7 @@ class __attribute__((visibility("default"))) HttpCurlObjectFactory : public core } virtual std::unique_ptr<ObjectFactory> assign(const std::string &class_name) override{ - if (utils::StringUtils::equalsIgnoreCase(class_name, "RESTReceiver")) { - return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTReceiver>()); - } else if (utils::StringUtils::equalsIgnoreCase(class_name, "RESTSender")) { + if (utils::StringUtils::equalsIgnoreCase(class_name, "RESTSender")) { return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::c2::RESTSender>()); } else if (utils::StringUtils::equalsIgnoreCase(class_name, "InvokeHTTP")) { return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<processors::InvokeHTTP>()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/RESTSender.cpp ---------------------------------------------------------------------- diff --git a/extensions/http-curl/RESTSender.cpp b/extensions/http-curl/RESTSender.cpp new file mode 100644 index 0000000..839c70b --- /dev/null +++ b/extensions/http-curl/RESTSender.cpp @@ -0,0 +1,140 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/protocols/RESTSender.h" + +#include <algorithm> +#include <memory> +#include <utility> +#include <map> +#include <string> +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +RESTSender::RESTSender(std::string name, uuid_t uuid) + : C2Protocol(name, uuid), + logger_(logging::LoggerFactory<Connectable>::getLogger()) { +} + +void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { + C2Protocol::initialize(controller, configure); + // base URL when one is not specified. + if (nullptr != configure) { + configure->get("c2.rest.url", rest_uri_); + configure->get("c2.rest.url.ack", ack_uri_); + } + logger_->log_info("Submitting to %s", rest_uri_); +} +C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) { + std::string operation_request_str = getOperation(payload); + std::string outputConfig; + if (direction == Direction::TRANSMIT) { + Json::Value json_payload; + json_payload["operation"] = operation_request_str; + if (payload.getIdentifier().length() > 0) { + json_payload["operationid"] = payload.getIdentifier(); + } + const std::vector<C2ContentResponse> &content = payload.getContent(); + + for (const auto &payload_content : content) { + Json::Value payload_content_values; + bool use_sub_option = true; + if (payload_content.op == payload.getOperation()) { + for (auto content : payload_content.operation_arguments) { + if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { + json_payload[payload_content.name] = content.second; + use_sub_option = false; + } else { + payload_content_values[content.first] = content.second; + } + } + } + if (use_sub_option) + json_payload[payload_content.name] = payload_content_values; + } + + for (const auto &nested_payload : payload.getNestedPayloads()) { + json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); + } + + Json::StyledWriter writer; + outputConfig = writer.write(json_payload); + } + + return sendPayload(url, direction, payload, outputConfig); +} + +C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) { + if (payload.getOperation() == ACKNOWLEDGE) { + return consumePayload(ack_uri_, payload, direction, async); + } + return consumePayload(rest_uri_, payload, direction, async); +} + +void RESTSender::update(const std::shared_ptr<Configure> &configure) { + std::string url; + configure->get("c2.rest.url", url); + configure->get("c2.rest.url.ack", url); +} + +const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) { + utils::HTTPClient client(url, ssl_context_service_); + client.setConnectionTimeout(2); + + std::unique_ptr<utils::ByteInputCallBack> input = nullptr; + std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr; + if (direction == Direction::TRANSMIT) { + input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack()); + callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback); + input->write(outputConfig); + callback->ptr = input.get(); + callback->pos = 0; + client.set_request_method("POST"); + client.setUploadCallback(callback.get()); + } else { + // we do not need to set the uplaod callback + // since we are not uploading anything on a get + client.set_request_method("GET"); + } + client.setContentType("application/json"); + bool isOkay = client.submit(); + int64_t respCode = client.getResponseCode(); + + if (isOkay && respCode) { + if (payload.isRaw()) { + C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true); + + response_payload.setRawData(client.getResponseBody()); + return response_payload; + } + return parseJsonResponse(payload, client.getResponseBody()); + } else { + return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); + } +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0981f9ac/extensions/http-curl/c2/protocols/RESTSender.h ---------------------------------------------------------------------- diff --git a/extensions/http-curl/c2/protocols/RESTSender.h b/extensions/http-curl/c2/protocols/RESTSender.h new file mode 100644 index 0000000..450799c --- /dev/null +++ b/extensions/http-curl/c2/protocols/RESTSender.h @@ -0,0 +1,80 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_C2_RESTSENDER_H_ +#define LIBMINIFI_INCLUDE_C2_RESTSENDER_H_ + +#include "json/json.h" +#include "json/writer.h" +#include <string> +#include <mutex> + +#include "utils/ByteArrayCallback.h" +#include "c2/C2Protocol.h" +#include "c2/protocols/RESTProtocol.h" +#include "c2/HeartBeatReporter.h" +#include "controllers/SSLContextService.h" +#include "../client/HTTPClient.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +/** + * Purpose and Justification: Encapsulates the restful protocol that is built upon C2Protocol. + * + * The external interfaces rely solely on send, where send includes a Direction. Transmit will perform a POST + * and RECEIVE will perform a GET. This does not mean we can't receive on a POST; however, since Direction + * will encompass other protocols the context of its meaning here simply translates into POST and GET respectively. + * + */ +class RESTSender : public RESTProtocol, public C2Protocol { + public: + + explicit RESTSender(std::string name, uuid_t uuid = nullptr); + + virtual C2Payload consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) override; + + virtual C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override; + + virtual void update(const std::shared_ptr<Configure> &configure) override; + + virtual void initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) override; + + protected: + + virtual const C2Payload sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig); + + std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; + + private: + std::shared_ptr<logging::Logger> logger_; + std::string rest_uri_; + std::string ack_uri_; +}; + +REGISTER_RESOURCE(RESTSender); + +} /* namesapce c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ + +#endif /* LIBMINIFI_INCLUDE_C2_RESTPROTOCOL_H_ */
