Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 18e87d2c1 -> 0b85b6983
MINIFICPP-443 Add support for GET requests in ListenHTTP and allow response body to be configured This closes #292. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/0b85b698 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/0b85b698 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/0b85b698 Branch: refs/heads/master Commit: 0b85b6983393f5b0258ad3b33301d892feb21b0f Parents: 18e87d2 Author: Andrew I. Christianson <[email protected]> Authored: Wed Mar 21 21:20:54 2018 -0400 Committer: Marc Parisi <[email protected]> Committed: Thu Apr 19 19:59:59 2018 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 2 +- PROCESSORS.md | 13 +- extensions/civetweb/processors/ListenHTTP.cpp | 351 +++++++++++++------ extensions/civetweb/processors/ListenHTTP.h | 80 ++++- libminifi/test/civetweb-tests/CMakeLists.txt | 39 +++ libminifi/test/civetweb-tests/CivetwebTests.cpp | 113 ++++++ 6 files changed, 469 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 328568c..163e637 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -204,7 +204,7 @@ endif() option(DISABLE_CIVET "Disables CivetWeb components." OFF) if (NOT DISABLE_CIVET) -createExtension(CIVETWEB CIVETWEB "This enables ListenHTTP" "extensions/civetweb") + createExtension(CIVETWEB CIVETWEB "This enables ListenHTTP" "extensions/civetweb" "${TEST_DIR}/civetweb-tests") endif() if (NOT DISABLE_CURL AND NOT DISABLE_CONTROLLER) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/PROCESSORS.md ---------------------------------------------------------------------- diff --git a/PROCESSORS.md b/PROCESSORS.md index 8fb870a..fe55b1c 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -452,10 +452,19 @@ default values, and whether a property supports the NiFi Expression Language. Starts an HTTP Server and listens on a given base path to transform incoming requests into FlowFiles. The default URI of the Service will be -`http://{hostname}:{port}/contentListener`. Only HEAD and POST requests are -supported. GET, PUT, and DELETE will result in an error and the HTTP response +`http://{hostname}:{port}/contentListener`. Only HEAD, POST, and GET requests are +supported. PUT, and DELETE will result in an error and the HTTP response status code 405. +The response body text for all requests, by default, is empty (length of 0). A +static response body can be set for a given URI by sending input files to +ListenHTTP with the `http.type` attribute set to `response_body`. The response +body FlowFile `filename` attribute is appended to the `Base Path` property +(separated by a `/`) when mapped to incoming requests. The `mime.type` +attribute of the response body FlowFile is used for the `Content-type` header +in responses. Response body content can be cleared by sending an empty (size 0) +FlowFile for a given URI mapping. + ### Properties In the list below, the names of required properties appear in bold. Any other http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/extensions/civetweb/processors/ListenHTTP.cpp ---------------------------------------------------------------------- diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp index d94df9b..46e65e8 100644 --- a/extensions/civetweb/processors/ListenHTTP.cpp +++ b/extensions/civetweb/processors/ListenHTTP.cpp @@ -19,22 +19,6 @@ * limitations under the License. */ #include "ListenHTTP.h" -#include <uuid/uuid.h> -#include <CivetServer.h> -#include <stdio.h> -#include <sstream> -#include <utility> -#include <memory> -#include <string> -#include <iostream> -#include <fstream> -#include <set> -#include <vector> -#include "utils/TimeUtil.h" -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/ProcessSessionFactory.h" -#include "core/logging/LoggerConfiguration.h" namespace org { namespace apache { @@ -44,15 +28,21 @@ namespace processors { core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming connections", "contentListener"); core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for incoming connections", ""); -core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming" - " connections. If the Pattern does not match the DN, the connection will be refused.", - ".*"); -core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", ""); -core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", ""); -core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no"); -core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2"); -core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", "Specifies the Regular Expression that determines the names of HTTP Headers that" - " should be passed along as FlowFile attributes", +core::Property ListenHTTP::AuthorizedDNPattern + ("Authorized DN Pattern", "A Regular Expression to apply against the Distinguished Name of incoming" + " connections. If the Pattern does not match the DN, the connection will be refused.", + ".*"); +core::Property ListenHTTP::SSLCertificate + ("SSL Certificate", "File containing PEM-formatted file including TLS/SSL certificate and key", ""); +core::Property ListenHTTP::SSLCertificateAuthority + ("SSL Certificate Authority", "File containing trusted PEM-formatted certificates", ""); +core::Property + ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the client's certificate (yes/no)", "no"); +core::Property ListenHTTP::SSLMinimumVersion + ("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2"); +core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as Attributes (Regex)", + "Specifies the Regular Expression that determines the names of HTTP Headers that" + " should be passed along as FlowFile attributes", ""); core::Relationship ListenHTTP::Success("success", "All files are routed to success"); @@ -81,7 +71,9 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF std::string basePath; if (!context->getProperty(BasePath.getName(), basePath)) { - logger_->log_info("%s attribute is missing, so default value of %s will be used", BasePath.getName(), BasePath.getValue()); + logger_->log_info("%s attribute is missing, so default value of %s will be used", + BasePath.getName(), + BasePath.getValue()); basePath = BasePath.getValue(); } @@ -112,7 +104,8 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF std::string sslMinVer; if (!sslCertFile.empty()) { - if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) { + if (context->getProperty(SSLCertificateAuthority.getName(), sslCertAuthorityFile) + && !sslCertAuthorityFile.empty()) { logger_->log_debug("ListenHTTP using %s: %s", SSLCertificateAuthority.getName(), sslCertAuthorityFile); } @@ -133,172 +126,300 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF std::string headersAsAttributesPattern; - if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) { + if (context->getProperty(HeadersAsAttributesRegex.getName(), headersAsAttributesPattern) + && !headersAsAttributesPattern.empty()) { logger_->log_debug("ListenHTTP using %s: %s", HeadersAsAttributesRegex.getName(), headersAsAttributesPattern); } auto numThreads = getMaxConcurrentTasks(); - logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", listeningPort, basePath, numThreads); + logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s with %d threads", + listeningPort, + basePath, + numThreads); // Initialize web server std::vector<std::string> options; - options.push_back("enable_keep_alive"); - options.push_back("yes"); - options.push_back("keep_alive_timeout_ms"); - options.push_back("15000"); - options.push_back("num_threads"); - options.push_back(std::to_string(numThreads)); + options.emplace_back("enable_keep_alive"); + options.emplace_back("yes"); + options.emplace_back("keep_alive_timeout_ms"); + options.emplace_back("15000"); + options.emplace_back("num_threads"); + options.emplace_back(std::to_string(numThreads)); if (sslCertFile.empty()) { - options.push_back("listening_ports"); - options.push_back(listeningPort); + options.emplace_back("listening_ports"); + options.emplace_back(listeningPort); } else { listeningPort += "s"; - options.push_back("listening_ports"); - options.push_back(listeningPort); + options.emplace_back("listening_ports"); + options.emplace_back(listeningPort); - options.push_back("ssl_certificate"); - options.push_back(sslCertFile); + options.emplace_back("ssl_certificate"); + options.emplace_back(sslCertFile); if (!sslCertAuthorityFile.empty()) { - options.push_back("ssl_ca_file"); - options.push_back(sslCertAuthorityFile); + options.emplace_back("ssl_ca_file"); + options.emplace_back(sslCertAuthorityFile); } - if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) { - options.push_back("ssl_verify_peer"); - options.push_back("no"); + if (sslVerifyPeer.empty() || sslVerifyPeer == "no") { + options.emplace_back("ssl_verify_peer"); + options.emplace_back("no"); } else { - options.push_back("ssl_verify_peer"); - options.push_back("yes"); + options.emplace_back("ssl_verify_peer"); + options.emplace_back("yes"); } - if (sslMinVer.compare("SSL2") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(0)); - } else if (sslMinVer.compare("SSL3") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(1)); - } else if (sslMinVer.compare("TLS1.0") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(2)); - } else if (sslMinVer.compare("TLS1.1") == 0) { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(3)); + if (sslMinVer == "SSL2") { + options.emplace_back("ssl_protocol_version"); + options.emplace_back(std::to_string(0)); + } else if (sslMinVer == "SSL3") { + options.emplace_back("ssl_protocol_version"); + options.emplace_back(std::to_string(1)); + } else if (sslMinVer == "TLS1.0") { + options.emplace_back("ssl_protocol_version"); + options.emplace_back(std::to_string(2)); + } else if (sslMinVer == "TLS1.1") { + options.emplace_back("ssl_protocol_version"); + options.emplace_back(std::to_string(3)); } else { - options.push_back("ssl_protocol_version"); - options.push_back(std::to_string(4)); + options.emplace_back("ssl_protocol_version"); + options.emplace_back(std::to_string(4)); } } - _server.reset(new CivetServer(options)); - _handler.reset(new Handler(context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern))); - _server->addHandler(basePath, _handler.get()); + server_.reset(new CivetServer(options)); + handler_.reset(new Handler(basePath, + context, + sessionFactory, + std::move(authDNPattern), + std::move(headersAsAttributesPattern))); + server_->addHandler(basePath, handler_.get()); } ListenHTTP::~ListenHTTP() { } void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->get()); + std::shared_ptr<FlowFileRecord> flow_file = std::static_pointer_cast<FlowFileRecord>(session->get()); // Do nothing if there are no incoming files - if (!flowFile) { + if (!flow_file) { return; } + + std::string type; + flow_file->getAttribute("http.type", type); + + if (type == "response_body") { + + if (handler_) { + struct response_body response{"", "", ""}; + ResponseBodyReadCallback cb(&response.body); + flow_file->getAttribute("filename", response.uri); + flow_file->getAttribute("mime.type", response.mime_type); + if (response.mime_type.empty()) { + logger_->log_warn("Using default mime type of application/octet-stream for response body file: %s", + response.uri); + response.mime_type = "application/octet-stream"; + } + session->read(flow_file, &cb); + handler_->set_response_body(std::move(response)); + } + } + + session->remove(flow_file); } -ListenHTTP::Handler::Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern) - : _authDNRegex(std::move(authDNPattern)), - _headersAsAttributesRegex(std::move(headersAsAttributesPattern)), +ListenHTTP::Handler::Handler(std::string base_uri, + core::ProcessContext *context, + core::ProcessSessionFactory *session_factory, + std::string &&auth_dn_regex, + std::string &&header_as_attrs_regex) + : base_uri_(std::move(base_uri)), + auth_dn_regex_(std::move(auth_dn_regex)), + headers_as_attrs_regex_(std::move(header_as_attrs_regex)), logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) { - _processContext = context; - _processSessionFactory = sessionFactory; + process_context_ = context; + session_factory_ = session_factory; +} + +void ListenHTTP::Handler::send_error_response(struct mg_connection *conn) { + mg_printf(conn, + "HTTP/1.1 500 Internal Server Error\r\n" + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); } -void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) { - mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); +void ListenHTTP::Handler::set_header_attributes(const mg_request_info *req_info, + const std::shared_ptr<FlowFileRecord> &flow_file) const { + // Add filename from "filename" header value (and pattern headers) + for (int i = 0; i < req_info->num_headers; i++) { + auto header = &req_info->http_headers[i]; + + if (strcmp("filename", header->name) == 0) { + if (!flow_file->updateAttribute("filename", header->value)) { + flow_file->addAttribute("filename", header->value); + } + } else if (std::regex_match(header->name, headers_as_attrs_regex_)) { + if (!flow_file->updateAttribute(header->name, header->value)) { + flow_file->addAttribute(header->name, header->value); + } + } + } + + if (req_info->query_string) { + flow_file->addAttribute("http.query", req_info->query_string); + } } bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) { auto req_info = mg_get_request_info(conn); logger_->log_debug("ListenHTTP handling POST request of length %ll", req_info->content_length); + if (!auth_request(conn, req_info)) { + return true; + } + + // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html) + mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n"); + + auto session = session_factory_->createSession(); + ListenHTTP::WriteCallback callback(conn, req_info); + auto flow_file = std::static_pointer_cast<FlowFileRecord>(session->create()); + + if (!flow_file) { + send_error_response(conn); + return true; + } + + try { + session->write(flow_file, &callback); + set_header_attributes(req_info, flow_file); + + session->transfer(flow_file, Success); + session->commit(); + } catch (std::exception &exception) { + logger_->log_error("ListenHTTP Caught Exception %s", exception.what()); + send_error_response(conn); + session->rollback(); + throw; + } catch (...) { + logger_->log_error("ListenHTTP Caught Exception Processor::onTrigger"); + send_error_response(conn); + session->rollback(); + throw; + } + + mg_printf(conn, "HTTP/1.1 200 OK\r\n"); + write_body(conn, req_info); + + return true; +} + +bool ListenHTTP::Handler::auth_request(mg_connection *conn, const mg_request_info *req_info) const { // If this is a two-way TLS connection, authorize the peer against the configured pattern + bool authorized = true; if (req_info->is_ssl && req_info->client_cert != nullptr) { - if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) { + if (!std::regex_match(req_info->client_cert->subject, auth_dn_regex_)) { mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); + "Content-Type: text/html\r\n" + "Content-Length: 0\r\n\r\n"); logger_->log_warn("ListenHTTP client DN not authorized: %s", req_info->client_cert->subject); - return true; + authorized = false; } } + return authorized; +} - // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html) - mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n"); +bool ListenHTTP::Handler::handleGet(CivetServer *server, struct mg_connection *conn) { + auto req_info = mg_get_request_info(conn); + logger_->log_debug("ListenHTTP handling GET request of URI %s", req_info->request_uri); - auto session = _processSessionFactory->createSession(); - ListenHTTP::WriteCallback callback(conn, req_info); - auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create()); + if (!auth_request(conn, req_info)) { + return true; + } + + auto session = session_factory_->createSession(); + auto flow_file = std::static_pointer_cast<FlowFileRecord>(session->create()); - if (!flowFile) { - sendErrorResponse(conn); + if (!flow_file) { + send_error_response(conn); return true; } try { - session->write(flowFile, &callback); - - // Add filename from "filename" header value (and pattern headers) - for (int i = 0; i < req_info->num_headers; i++) { - auto header = &req_info->http_headers[i]; - - if (strcmp("filename", header->name) == 0) { - if (!flowFile->updateAttribute("filename", header->value)) { - flowFile->addAttribute("filename", header->value); - } - } else if (std::regex_match(header->name, _headersAsAttributesRegex)) { - if (!flowFile->updateAttribute(header->name, header->value)) { - flowFile->addAttribute(header->name, header->value); - } - } - } + set_header_attributes(req_info, flow_file); - session->transfer(flowFile, Success); + session->transfer(flow_file, Success); session->commit(); } catch (std::exception &exception) { logger_->log_error("ListenHTTP Caught Exception %s", exception.what()); - sendErrorResponse(conn); + send_error_response(conn); session->rollback(); throw; } catch (...) { logger_->log_error("ListenHTTP Caught Exception Processor::onTrigger"); - sendErrorResponse(conn); + send_error_response(conn); session->rollback(); throw; } - mg_printf(conn, "HTTP/1.1 200 OK\r\n" - "Content-Type: text/html\r\n" - "Content-Length: 0\r\n\r\n"); + mg_printf(conn, "HTTP/1.1 200 OK\r\n"); + write_body(conn, req_info); return true; } +void ListenHTTP::Handler::write_body(mg_connection *conn, const mg_request_info *req_info) { + const auto &request_uri_str = std::string(req_info->request_uri); + + if (request_uri_str.size() > base_uri_.size() + 1) { + struct response_body response{}; + + { + // Attempt to minimize time holding mutex (it would be nice to have a lock-free concurrent map here) + std::lock_guard<std::mutex> guard(uri_map_mutex_); + std::string req_uri = request_uri_str.substr(base_uri_.size() + 1); + + if (response_uri_map_.count(req_uri)) { + response = response_uri_map_[req_uri]; + } + } + + if (!response.body.empty()) { + logger_->log_debug("Writing response body of %lu bytes for URI: %s", + response.body.size(), + req_info->request_uri); + mg_printf(conn, "Content-type: "); + mg_printf(conn, response.mime_type.c_str()); + mg_printf(conn, "\r\n"); + mg_printf(conn, "Content-length: "); + mg_printf(conn, std::to_string(response.body.size()).c_str()); + mg_printf(conn, "\r\n\r\n"); + mg_printf(conn, response.body.c_str()); + + } else { + logger_->log_debug("No response body available for URI: %s", req_info->request_uri); + mg_printf(conn, "Content-length: 0\r\n\r\n"); + } + } else { + logger_->log_debug("No response body available for URI: %s", req_info->request_uri); + mg_printf(conn, "Content-length: 0\r\n\r\n"); + } +} + ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) { - _conn = conn; - _reqInfo = reqInfo; + conn_ = conn; + req_info_ = reqInfo; } int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) { int64_t rlen; int64_t nlen = 0; - int64_t tlen = _reqInfo->content_length; + int64_t tlen = req_info_->content_length; uint8_t buf[16384]; // if we have no content length we should call mg_read until @@ -306,12 +427,12 @@ int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> strea while (tlen == -1 || nlen < tlen) { rlen = tlen == -1 ? sizeof(buf) : tlen - nlen; - if (rlen > (int64_t)sizeof(buf)) { - rlen = (int64_t)sizeof(buf); + if (rlen > (int64_t) sizeof(buf)) { + rlen = (int64_t) sizeof(buf); } // Read a buffer of data from client - rlen = mg_read(_conn, &buf[0], (size_t) rlen); + rlen = mg_read(conn_, &buf[0], (size_t) rlen); if (rlen <= 0) { break; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/extensions/civetweb/processors/ListenHTTP.h ---------------------------------------------------------------------- diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h index 5199d19..08464af 100644 --- a/extensions/civetweb/processors/ListenHTTP.h +++ b/extensions/civetweb/processors/ListenHTTP.h @@ -24,6 +24,7 @@ #include <regex> #include <CivetServer.h> +#include <concurrentqueue.h> #include "FlowFileRecord.h" #include "core/Processor.h" @@ -53,7 +54,7 @@ class ListenHTTP : public core::Processor { // Destructor virtual ~ListenHTTP(); // Processor Name - static constexpr char const* ProcessorName = "ListenHTTP"; + static constexpr char const *ProcessorName = "ListenHTTP"; // Supported Properties static core::Property BasePath; static core::Property Port; @@ -70,23 +71,80 @@ class ListenHTTP : public core::Processor { void initialize(); void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory); + struct response_body { + std::string uri; + std::string mime_type; + std::string body; + }; + // HTTP request handler class Handler : public CivetHandler { public: - Handler(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, std::string &&headersAsAttributesPattern); + Handler(std::string base_uri, + core::ProcessContext *context, + core::ProcessSessionFactory *sessionFactory, + std::string &&authDNPattern, + std::string &&headersAsAttributesPattern); bool handlePost(CivetServer *server, struct mg_connection *conn); + bool handleGet(CivetServer *server, struct mg_connection *conn); + + /** + * Sets a static response body string to be used for a given URI, with a number of seconds it will be kept in memory. + * @param response + */ + void set_response_body(struct response_body response) { + std::lock_guard<std::mutex> guard(uri_map_mutex_); + + if (response.body.empty()) { + logger_->log_info("Unregistering response body for URI '%s'", + response.uri); + response_uri_map_.erase(response.uri); + } else { + logger_->log_info("Registering response body for URI '%s' of length %lu", + response.uri, + response.body.size()); + response_uri_map_[response.uri] = std::move(response); + } + } private: // Send HTTP 500 error response to client - void sendErrorResponse(struct mg_connection *conn); + void send_error_response(struct mg_connection *conn); + bool auth_request(mg_connection *conn, const mg_request_info *req_info) const; + void set_header_attributes(const mg_request_info *req_info, const std::shared_ptr<FlowFileRecord> &flow_file) const; + void write_body(mg_connection *conn, const mg_request_info *req_info); - std::regex _authDNRegex; - std::regex _headersAsAttributesRegex; - core::ProcessContext *_processContext; - core::ProcessSessionFactory *_processSessionFactory; + std::string base_uri_; + std::regex auth_dn_regex_; + std::regex headers_as_attrs_regex_; + core::ProcessContext *process_context_; + core::ProcessSessionFactory *session_factory_; // Logger std::shared_ptr<logging::Logger> logger_; + std::map<std::string, response_body> response_uri_map_; + std::mutex uri_map_mutex_; + }; + + class ResponseBodyReadCallback : public InputStreamCallback { + public: + explicit ResponseBodyReadCallback(std::string *out_str) + : out_str_(out_str) { + } + int64_t process(std::shared_ptr<io::BaseStream> stream) { + out_str_->resize(stream->getSize()); + auto num_read = stream->readData(reinterpret_cast<uint8_t *>(&(*out_str_)[0]), + static_cast<int>(stream->getSize())); + + if (num_read != stream->getSize()) { + throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream"); + } + + return num_read; + } + + private: + std::string *out_str_; }; // Write callback for transferring data from HTTP request to content repo @@ -99,16 +157,16 @@ class ListenHTTP : public core::Processor { // Logger std::shared_ptr<logging::Logger> logger_; - struct mg_connection *_conn; - const struct mg_request_info *_reqInfo; + struct mg_connection *conn_; + const struct mg_request_info *req_info_; }; private: // Logger std::shared_ptr<logging::Logger> logger_; - std::unique_ptr<CivetServer> _server; - std::unique_ptr<Handler> _handler; + std::unique_ptr<CivetServer> server_; + std::unique_ptr<Handler> handler_; }; REGISTER_RESOURCE(ListenHTTP); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/libminifi/test/civetweb-tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libminifi/test/civetweb-tests/CMakeLists.txt b/libminifi/test/civetweb-tests/CMakeLists.txt new file mode 100644 index 0000000..eb0fe27 --- /dev/null +++ b/libminifi/test/civetweb-tests/CMakeLists.txt @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +file(GLOB CIVETWEB_INTEGRATION_TESTS "*.cpp") +SET(CIVETWEB-EXTENSIONS_TEST_COUNT 0) +FOREACH(testfile ${CIVETWEB_INTEGRATION_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/civetweb") + target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/thirdparty/civetweb-1.10/include") + + if (APPLE) + target_link_libraries (${testfilename} -Wl,-all_load minifi-civet-extensions minifi-http-curl) + else () + target_link_libraries (${testfilename} -Wl,--whole-archive minifi-civet-extensions minifi-http-curl -Wl,--no-whole-archive) + endif () + + createTests("${testfilename}") + target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) + MATH(EXPR CIVETWEB-EXTENSIONS_TEST_COUNT "${CIVETWEB-EXTENSIONS_TEST_COUNT}+1") + add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) +ENDFOREACH() +message("-- Finished building ${CIVETWEB-EXTENSIONS_TEST_COUNT} civetweb related test file(s)...") http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/libminifi/test/civetweb-tests/CivetwebTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/civetweb-tests/CivetwebTests.cpp b/libminifi/test/civetweb-tests/CivetwebTests.cpp new file mode 100644 index 0000000..8971e64 --- /dev/null +++ b/libminifi/test/civetweb-tests/CivetwebTests.cpp @@ -0,0 +1,113 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <uuid/uuid.h> +#include <fstream> +#include <map> +#include <memory> +#include <set> +#include <iostream> +#include <GenerateFlowFile.h> +#include <UpdateAttribute.h> +#include <LogAttribute.h> +#include <processors/ListenHTTP.h> + +#include "../TestBase.h" + +#include "processors/GetFile.h" +#include "processors/PutFile.h" +#include "../../../extensions/http-curl/client/HTTPClient.h" + +TEST_CASE("Test Creation of ListenHTTP", "[ListenHTTPreate]") { // NOLINT + TestController testController; + std::shared_ptr<core::Processor> + processor = std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("processorname"); + REQUIRE(processor->getName() == "processorname"); +} + +TEST_CASE("Test GET Body", "[ListenHTTPGETBody]") { // NOLINT + TestController testController; + + LogTestController::getInstance().setTrace<TestPlan>(); + LogTestController::getInstance().setTrace<processors::GenerateFlowFile>(); + LogTestController::getInstance().setTrace<processors::UpdateAttribute>(); + LogTestController::getInstance().setTrace<processors::LogAttribute>(); + LogTestController::getInstance().setTrace<processors::ListenHTTP>(); + LogTestController::getInstance().setTrace<processors::ListenHTTP::Handler>(); + + auto plan = testController.createPlan(); + auto repo = std::make_shared<TestRepository>(); + + // Define directory for test input + std::string test_in_dir("/tmp/gt.XXXXXX"); + REQUIRE(testController.createTempDirectory(&test_in_dir[0]) != nullptr); + + // Define test input file + std::string test_input_file(test_in_dir); + test_input_file.append("/test"); + { + std::ofstream os(test_input_file); + os << "Hello response body" << std::endl; + } + + // Build MiNiFi processing graph + auto get = plan->addProcessor( + "GetFile", + "Get"); + plan->setProperty( + get, + "Input Directory", + test_in_dir); + auto update = plan->addProcessor( + "UpdateAttribute", + "Update", + core::Relationship("success", "description"), + true); + plan->setProperty( + update, + "http.type", + "response_body", + true); + auto log = plan->addProcessor( + "LogAttribute", + "Log", + core::Relationship("success", "description"), + true); + auto listen = plan->addProcessor( + "ListenHTTP", + "ListenHTTP", + core::Relationship("success", "description"), + true); + plan->setProperty( + listen, + "Listening Port", + "8888"); + listen->setAutoTerminatedRelationships({{"success", ""}}); + + plan->runNextProcessor(); // Get + plan->runNextProcessor(); // Update + plan->runNextProcessor(); // Log + plan->runNextProcessor(); // Listen + + sleep(1); + utils::HTTPClient client("http://localhost:8888/contentListener/test"); + REQUIRE(client.submit()); + const auto &body_chars = client.getResponseBody(); + std::string response_body(body_chars.data(), body_chars.size()); + REQUIRE("Hello response body\n" == response_body); +}
