Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 18e87d2c1 -> 0b85b6983


MINIFICPP-443 Add support for GET requests in ListenHTTP and allow response 
body to be configured

This closes #292.

Signed-off-by: Marc Parisi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/0b85b698
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/0b85b698
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/0b85b698

Branch: refs/heads/master
Commit: 0b85b6983393f5b0258ad3b33301d892feb21b0f
Parents: 18e87d2
Author: Andrew I. Christianson <[email protected]>
Authored: Wed Mar 21 21:20:54 2018 -0400
Committer: Marc Parisi <[email protected]>
Committed: Thu Apr 19 19:59:59 2018 -0400

----------------------------------------------------------------------
 CMakeLists.txt                                  |   2 +-
 PROCESSORS.md                                   |  13 +-
 extensions/civetweb/processors/ListenHTTP.cpp   | 351 +++++++++++++------
 extensions/civetweb/processors/ListenHTTP.h     |  80 ++++-
 libminifi/test/civetweb-tests/CMakeLists.txt    |  39 +++
 libminifi/test/civetweb-tests/CivetwebTests.cpp | 113 ++++++
 6 files changed, 469 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 328568c..163e637 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -204,7 +204,7 @@ endif()
 
 option(DISABLE_CIVET "Disables CivetWeb components." OFF)
 if (NOT DISABLE_CIVET)
-createExtension(CIVETWEB CIVETWEB "This enables ListenHTTP" 
"extensions/civetweb")
+       createExtension(CIVETWEB CIVETWEB "This enables ListenHTTP" 
"extensions/civetweb" "${TEST_DIR}/civetweb-tests")
 endif()
 
 if (NOT DISABLE_CURL AND NOT DISABLE_CONTROLLER)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/PROCESSORS.md
----------------------------------------------------------------------
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 8fb870a..fe55b1c 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -452,10 +452,19 @@ default values, and whether a property supports the NiFi 
Expression Language.
 
 Starts an HTTP Server and listens on a given base path to transform incoming
 requests into FlowFiles. The default URI of the Service will be
-`http://{hostname}:{port}/contentListener`. Only HEAD and POST requests are
-supported. GET, PUT, and DELETE will result in an error and the HTTP response
+`http://{hostname}:{port}/contentListener`. Only HEAD, POST, and GET requests 
are
+supported. PUT, and DELETE will result in an error and the HTTP response
 status code 405.
 
+The response body text for all requests, by default, is empty (length of 0). A
+static response body can be set for a given URI by sending input files to
+ListenHTTP with the `http.type` attribute set to `response_body`. The response
+body FlowFile `filename` attribute is appended to the `Base Path` property
+(separated by a `/`) when mapped to incoming requests. The `mime.type`
+attribute of the response body FlowFile is used for the `Content-type` header
+in responses. Response body content can be cleared by sending an empty (size 0)
+FlowFile for a given URI mapping.
+
 ### Properties
 
 In the list below, the names of required properties appear in bold. Any other

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/extensions/civetweb/processors/ListenHTTP.cpp
----------------------------------------------------------------------
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp 
b/extensions/civetweb/processors/ListenHTTP.cpp
index d94df9b..46e65e8 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -19,22 +19,6 @@
  * limitations under the License.
  */
 #include "ListenHTTP.h"
-#include <uuid/uuid.h>
-#include <CivetServer.h>
-#include <stdio.h>
-#include <sstream>
-#include <utility>
-#include <memory>
-#include <string>
-#include <iostream>
-#include <fstream>
-#include <set>
-#include <vector>
-#include "utils/TimeUtil.h"
-#include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
-#include "core/ProcessSessionFactory.h"
-#include "core/logging/LoggerConfiguration.h"
 
 namespace org {
 namespace apache {
@@ -44,15 +28,21 @@ namespace processors {
 
 core::Property ListenHTTP::BasePath("Base Path", "Base path for incoming 
connections", "contentListener");
 core::Property ListenHTTP::Port("Listening Port", "The Port to listen on for 
incoming connections", "");
-core::Property ListenHTTP::AuthorizedDNPattern("Authorized DN Pattern", "A 
Regular Expression to apply against the Distinguished Name of incoming"
-                                               " connections. If the Pattern 
does not match the DN, the connection will be refused.",
-                                               ".*");
-core::Property ListenHTTP::SSLCertificate("SSL Certificate", "File containing 
PEM-formatted file including TLS/SSL certificate and key", "");
-core::Property ListenHTTP::SSLCertificateAuthority("SSL Certificate 
Authority", "File containing trusted PEM-formatted certificates", "");
-core::Property ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to 
verify the client's certificate (yes/no)", "no");
-core::Property ListenHTTP::SSLMinimumVersion("SSL Minimum Version", "Minimum 
TLS/SSL version allowed (SSL2, SSL3, TLS1.0, TLS1.1, TLS1.2)", "SSL2");
-core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive 
as Attributes (Regex)", "Specifies the Regular Expression that determines the 
names of HTTP Headers that"
-                                                    " should be passed along 
as FlowFile attributes",
+core::Property ListenHTTP::AuthorizedDNPattern
+    ("Authorized DN Pattern", "A Regular Expression to apply against the 
Distinguished Name of incoming"
+         " connections. If the Pattern does not match the DN, the connection 
will be refused.",
+     ".*");
+core::Property ListenHTTP::SSLCertificate
+    ("SSL Certificate", "File containing PEM-formatted file including TLS/SSL 
certificate and key", "");
+core::Property ListenHTTP::SSLCertificateAuthority
+    ("SSL Certificate Authority", "File containing trusted PEM-formatted 
certificates", "");
+core::Property
+    ListenHTTP::SSLVerifyPeer("SSL Verify Peer", "Whether or not to verify the 
client's certificate (yes/no)", "no");
+core::Property ListenHTTP::SSLMinimumVersion
+    ("SSL Minimum Version", "Minimum TLS/SSL version allowed (SSL2, SSL3, 
TLS1.0, TLS1.1, TLS1.2)", "SSL2");
+core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive 
as Attributes (Regex)",
+                                                    "Specifies the Regular 
Expression that determines the names of HTTP Headers that"
+                                                        " should be passed 
along as FlowFile attributes",
                                                     "");
 
 core::Relationship ListenHTTP::Success("success", "All files are routed to 
success");
@@ -81,7 +71,9 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, 
core::ProcessSessionF
   std::string basePath;
 
   if (!context->getProperty(BasePath.getName(), basePath)) {
-    logger_->log_info("%s attribute is missing, so default value of %s will be 
used", BasePath.getName(), BasePath.getValue());
+    logger_->log_info("%s attribute is missing, so default value of %s will be 
used",
+                      BasePath.getName(),
+                      BasePath.getValue());
     basePath = BasePath.getValue();
   }
 
@@ -112,7 +104,8 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, 
core::ProcessSessionF
   std::string sslMinVer;
 
   if (!sslCertFile.empty()) {
-    if (context->getProperty(SSLCertificateAuthority.getName(), 
sslCertAuthorityFile) && !sslCertAuthorityFile.empty()) {
+    if (context->getProperty(SSLCertificateAuthority.getName(), 
sslCertAuthorityFile)
+        && !sslCertAuthorityFile.empty()) {
       logger_->log_debug("ListenHTTP using %s: %s", 
SSLCertificateAuthority.getName(), sslCertAuthorityFile);
     }
 
@@ -133,172 +126,300 @@ void ListenHTTP::onSchedule(core::ProcessContext 
*context, core::ProcessSessionF
 
   std::string headersAsAttributesPattern;
 
-  if (context->getProperty(HeadersAsAttributesRegex.getName(), 
headersAsAttributesPattern) && !headersAsAttributesPattern.empty()) {
+  if (context->getProperty(HeadersAsAttributesRegex.getName(), 
headersAsAttributesPattern)
+      && !headersAsAttributesPattern.empty()) {
     logger_->log_debug("ListenHTTP using %s: %s", 
HeadersAsAttributesRegex.getName(), headersAsAttributesPattern);
   }
 
   auto numThreads = getMaxConcurrentTasks();
 
-  logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s 
with %d threads", listeningPort, basePath, numThreads);
+  logger_->log_info("ListenHTTP starting HTTP server on port %s and path %s 
with %d threads",
+                    listeningPort,
+                    basePath,
+                    numThreads);
 
   // Initialize web server
   std::vector<std::string> options;
-  options.push_back("enable_keep_alive");
-  options.push_back("yes");
-  options.push_back("keep_alive_timeout_ms");
-  options.push_back("15000");
-  options.push_back("num_threads");
-  options.push_back(std::to_string(numThreads));
+  options.emplace_back("enable_keep_alive");
+  options.emplace_back("yes");
+  options.emplace_back("keep_alive_timeout_ms");
+  options.emplace_back("15000");
+  options.emplace_back("num_threads");
+  options.emplace_back(std::to_string(numThreads));
 
   if (sslCertFile.empty()) {
-    options.push_back("listening_ports");
-    options.push_back(listeningPort);
+    options.emplace_back("listening_ports");
+    options.emplace_back(listeningPort);
   } else {
     listeningPort += "s";
-    options.push_back("listening_ports");
-    options.push_back(listeningPort);
+    options.emplace_back("listening_ports");
+    options.emplace_back(listeningPort);
 
-    options.push_back("ssl_certificate");
-    options.push_back(sslCertFile);
+    options.emplace_back("ssl_certificate");
+    options.emplace_back(sslCertFile);
 
     if (!sslCertAuthorityFile.empty()) {
-      options.push_back("ssl_ca_file");
-      options.push_back(sslCertAuthorityFile);
+      options.emplace_back("ssl_ca_file");
+      options.emplace_back(sslCertAuthorityFile);
     }
 
-    if (sslVerifyPeer.empty() || sslVerifyPeer.compare("no") == 0) {
-      options.push_back("ssl_verify_peer");
-      options.push_back("no");
+    if (sslVerifyPeer.empty() || sslVerifyPeer == "no") {
+      options.emplace_back("ssl_verify_peer");
+      options.emplace_back("no");
     } else {
-      options.push_back("ssl_verify_peer");
-      options.push_back("yes");
+      options.emplace_back("ssl_verify_peer");
+      options.emplace_back("yes");
     }
 
-    if (sslMinVer.compare("SSL2") == 0) {
-      options.push_back("ssl_protocol_version");
-      options.push_back(std::to_string(0));
-    } else if (sslMinVer.compare("SSL3") == 0) {
-      options.push_back("ssl_protocol_version");
-      options.push_back(std::to_string(1));
-    } else if (sslMinVer.compare("TLS1.0") == 0) {
-      options.push_back("ssl_protocol_version");
-      options.push_back(std::to_string(2));
-    } else if (sslMinVer.compare("TLS1.1") == 0) {
-      options.push_back("ssl_protocol_version");
-      options.push_back(std::to_string(3));
+    if (sslMinVer == "SSL2") {
+      options.emplace_back("ssl_protocol_version");
+      options.emplace_back(std::to_string(0));
+    } else if (sslMinVer == "SSL3") {
+      options.emplace_back("ssl_protocol_version");
+      options.emplace_back(std::to_string(1));
+    } else if (sslMinVer == "TLS1.0") {
+      options.emplace_back("ssl_protocol_version");
+      options.emplace_back(std::to_string(2));
+    } else if (sslMinVer == "TLS1.1") {
+      options.emplace_back("ssl_protocol_version");
+      options.emplace_back(std::to_string(3));
     } else {
-      options.push_back("ssl_protocol_version");
-      options.push_back(std::to_string(4));
+      options.emplace_back("ssl_protocol_version");
+      options.emplace_back(std::to_string(4));
     }
   }
 
-  _server.reset(new CivetServer(options));
-  _handler.reset(new Handler(context, sessionFactory, 
std::move(authDNPattern), std::move(headersAsAttributesPattern)));
-  _server->addHandler(basePath, _handler.get());
+  server_.reset(new CivetServer(options));
+  handler_.reset(new Handler(basePath,
+                             context,
+                             sessionFactory,
+                             std::move(authDNPattern),
+                             std::move(headersAsAttributesPattern)));
+  server_->addHandler(basePath, handler_.get());
 }
 
 ListenHTTP::~ListenHTTP() {
 }
 
 void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
-  std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->get());
+  std::shared_ptr<FlowFileRecord> flow_file = 
std::static_pointer_cast<FlowFileRecord>(session->get());
 
   // Do nothing if there are no incoming files
-  if (!flowFile) {
+  if (!flow_file) {
     return;
   }
+
+  std::string type;
+  flow_file->getAttribute("http.type", type);
+
+  if (type == "response_body") {
+
+    if (handler_) {
+      struct response_body response{"", "", ""};
+      ResponseBodyReadCallback cb(&response.body);
+      flow_file->getAttribute("filename", response.uri);
+      flow_file->getAttribute("mime.type", response.mime_type);
+      if (response.mime_type.empty()) {
+        logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s",
+                          response.uri);
+        response.mime_type = "application/octet-stream";
+      }
+      session->read(flow_file, &cb);
+      handler_->set_response_body(std::move(response));
+    }
+  }
+
+  session->remove(flow_file);
 }
 
-ListenHTTP::Handler::Handler(core::ProcessContext *context, 
core::ProcessSessionFactory *sessionFactory, std::string &&authDNPattern, 
std::string &&headersAsAttributesPattern)
-    : _authDNRegex(std::move(authDNPattern)),
-      _headersAsAttributesRegex(std::move(headersAsAttributesPattern)),
+ListenHTTP::Handler::Handler(std::string base_uri,
+                             core::ProcessContext *context,
+                             core::ProcessSessionFactory *session_factory,
+                             std::string &&auth_dn_regex,
+                             std::string &&header_as_attrs_regex)
+    : base_uri_(std::move(base_uri)),
+      auth_dn_regex_(std::move(auth_dn_regex)),
+      headers_as_attrs_regex_(std::move(header_as_attrs_regex)),
       logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) {
-  _processContext = context;
-  _processSessionFactory = sessionFactory;
+  process_context_ = context;
+  session_factory_ = session_factory;
+}
+
+void ListenHTTP::Handler::send_error_response(struct mg_connection *conn) {
+  mg_printf(conn,
+            "HTTP/1.1 500 Internal Server Error\r\n"
+                "Content-Type: text/html\r\n"
+                "Content-Length: 0\r\n\r\n");
 }
 
-void ListenHTTP::Handler::sendErrorResponse(struct mg_connection *conn) {
-  mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"
-            "Content-Type: text/html\r\n"
-            "Content-Length: 0\r\n\r\n");
+void ListenHTTP::Handler::set_header_attributes(const mg_request_info 
*req_info,
+                                                const 
std::shared_ptr<FlowFileRecord> &flow_file) const {
+  // Add filename from "filename" header value (and pattern headers)
+  for (int i = 0; i < req_info->num_headers; i++) {
+    auto header = &req_info->http_headers[i];
+
+    if (strcmp("filename", header->name) == 0) {
+      if (!flow_file->updateAttribute("filename", header->value)) {
+        flow_file->addAttribute("filename", header->value);
+      }
+    } else if (std::regex_match(header->name, headers_as_attrs_regex_)) {
+      if (!flow_file->updateAttribute(header->name, header->value)) {
+        flow_file->addAttribute(header->name, header->value);
+      }
+    }
+  }
+
+  if (req_info->query_string) {
+    flow_file->addAttribute("http.query", req_info->query_string);
+  }
 }
 
 bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection 
*conn) {
   auto req_info = mg_get_request_info(conn);
   logger_->log_debug("ListenHTTP handling POST request of length %ll", 
req_info->content_length);
 
+  if (!auth_request(conn, req_info)) {
+    return true;
+  }
+
+  // Always send 100 Continue, as allowed per standard to minimize client 
delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
+  mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
+
+  auto session = session_factory_->createSession();
+  ListenHTTP::WriteCallback callback(conn, req_info);
+  auto flow_file = std::static_pointer_cast<FlowFileRecord>(session->create());
+
+  if (!flow_file) {
+    send_error_response(conn);
+    return true;
+  }
+
+  try {
+    session->write(flow_file, &callback);
+    set_header_attributes(req_info, flow_file);
+
+    session->transfer(flow_file, Success);
+    session->commit();
+  } catch (std::exception &exception) {
+    logger_->log_error("ListenHTTP Caught Exception %s", exception.what());
+    send_error_response(conn);
+    session->rollback();
+    throw;
+  } catch (...) {
+    logger_->log_error("ListenHTTP Caught Exception Processor::onTrigger");
+    send_error_response(conn);
+    session->rollback();
+    throw;
+  }
+
+  mg_printf(conn, "HTTP/1.1 200 OK\r\n");
+  write_body(conn, req_info);
+
+  return true;
+}
+
+bool ListenHTTP::Handler::auth_request(mg_connection *conn, const 
mg_request_info *req_info) const {
   // If this is a two-way TLS connection, authorize the peer against the 
configured pattern
+  bool authorized = true;
   if (req_info->is_ssl && req_info->client_cert != nullptr) {
-    if (!std::regex_match(req_info->client_cert->subject, _authDNRegex)) {
+    if (!std::regex_match(req_info->client_cert->subject, auth_dn_regex_)) {
       mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n"
-                "Content-Type: text/html\r\n"
-                "Content-Length: 0\r\n\r\n");
+          "Content-Type: text/html\r\n"
+          "Content-Length: 0\r\n\r\n");
       logger_->log_warn("ListenHTTP client DN not authorized: %s", 
req_info->client_cert->subject);
-      return true;
+      authorized = false;
     }
   }
+  return authorized;
+}
 
-  // Always send 100 Continue, as allowed per standard to minimize client 
delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
-  mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
+bool ListenHTTP::Handler::handleGet(CivetServer *server, struct mg_connection 
*conn) {
+  auto req_info = mg_get_request_info(conn);
+  logger_->log_debug("ListenHTTP handling GET request of URI %s", 
req_info->request_uri);
 
-  auto session = _processSessionFactory->createSession();
-  ListenHTTP::WriteCallback callback(conn, req_info);
-  auto flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
+  if (!auth_request(conn, req_info)) {
+    return true;
+  }
+
+  auto session = session_factory_->createSession();
+  auto flow_file = std::static_pointer_cast<FlowFileRecord>(session->create());
 
-  if (!flowFile) {
-    sendErrorResponse(conn);
+  if (!flow_file) {
+    send_error_response(conn);
     return true;
   }
 
   try {
-    session->write(flowFile, &callback);
-
-    // Add filename from "filename" header value (and pattern headers)
-    for (int i = 0; i < req_info->num_headers; i++) {
-      auto header = &req_info->http_headers[i];
-
-      if (strcmp("filename", header->name) == 0) {
-        if (!flowFile->updateAttribute("filename", header->value)) {
-          flowFile->addAttribute("filename", header->value);
-        }
-      } else if (std::regex_match(header->name, _headersAsAttributesRegex)) {
-        if (!flowFile->updateAttribute(header->name, header->value)) {
-          flowFile->addAttribute(header->name, header->value);
-        }
-      }
-    }
+    set_header_attributes(req_info, flow_file);
 
-    session->transfer(flowFile, Success);
+    session->transfer(flow_file, Success);
     session->commit();
   } catch (std::exception &exception) {
     logger_->log_error("ListenHTTP Caught Exception %s", exception.what());
-    sendErrorResponse(conn);
+    send_error_response(conn);
     session->rollback();
     throw;
   } catch (...) {
     logger_->log_error("ListenHTTP Caught Exception Processor::onTrigger");
-    sendErrorResponse(conn);
+    send_error_response(conn);
     session->rollback();
     throw;
   }
 
-  mg_printf(conn, "HTTP/1.1 200 OK\r\n"
-            "Content-Type: text/html\r\n"
-            "Content-Length: 0\r\n\r\n");
+  mg_printf(conn, "HTTP/1.1 200 OK\r\n");
+  write_body(conn, req_info);
 
   return true;
 }
 
+void ListenHTTP::Handler::write_body(mg_connection *conn, const 
mg_request_info *req_info) {
+  const auto &request_uri_str = std::string(req_info->request_uri);
+
+  if (request_uri_str.size() > base_uri_.size() + 1) {
+    struct response_body response{};
+
+    {
+      // Attempt to minimize time holding mutex (it would be nice to have a 
lock-free concurrent map here)
+      std::lock_guard<std::mutex> guard(uri_map_mutex_);
+      std::string req_uri = request_uri_str.substr(base_uri_.size() + 1);
+
+      if (response_uri_map_.count(req_uri)) {
+        response = response_uri_map_[req_uri];
+      }
+    }
+
+    if (!response.body.empty()) {
+      logger_->log_debug("Writing response body of %lu bytes for URI: %s",
+                         response.body.size(),
+                         req_info->request_uri);
+      mg_printf(conn, "Content-type: ");
+      mg_printf(conn, response.mime_type.c_str());
+      mg_printf(conn, "\r\n");
+      mg_printf(conn, "Content-length: ");
+      mg_printf(conn, std::to_string(response.body.size()).c_str());
+      mg_printf(conn, "\r\n\r\n");
+      mg_printf(conn, response.body.c_str());
+
+    } else {
+      logger_->log_debug("No response body available for URI: %s", 
req_info->request_uri);
+      mg_printf(conn, "Content-length: 0\r\n\r\n");
+    }
+  } else {
+    logger_->log_debug("No response body available for URI: %s", 
req_info->request_uri);
+    mg_printf(conn, "Content-length: 0\r\n\r\n");
+  }
+}
+
 ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const 
struct mg_request_info *reqInfo)
     : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
-  _conn = conn;
-  _reqInfo = reqInfo;
+  conn_ = conn;
+  req_info_ = reqInfo;
 }
 
 int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> 
stream) {
   int64_t rlen;
   int64_t nlen = 0;
-  int64_t tlen = _reqInfo->content_length;
+  int64_t tlen = req_info_->content_length;
   uint8_t buf[16384];
 
   // if we have no content length we should call mg_read until
@@ -306,12 +427,12 @@ int64_t 
ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> strea
   while (tlen == -1 || nlen < tlen) {
     rlen = tlen == -1 ? sizeof(buf) : tlen - nlen;
 
-    if (rlen > (int64_t)sizeof(buf)) {
-      rlen = (int64_t)sizeof(buf);
+    if (rlen > (int64_t) sizeof(buf)) {
+      rlen = (int64_t) sizeof(buf);
     }
 
     // Read a buffer of data from client
-    rlen = mg_read(_conn, &buf[0], (size_t) rlen);
+    rlen = mg_read(conn_, &buf[0], (size_t) rlen);
 
     if (rlen <= 0) {
       break;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/extensions/civetweb/processors/ListenHTTP.h
----------------------------------------------------------------------
diff --git a/extensions/civetweb/processors/ListenHTTP.h 
b/extensions/civetweb/processors/ListenHTTP.h
index 5199d19..08464af 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -24,6 +24,7 @@
 #include <regex>
 
 #include <CivetServer.h>
+#include <concurrentqueue.h>
 
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -53,7 +54,7 @@ class ListenHTTP : public core::Processor {
   // Destructor
   virtual ~ListenHTTP();
   // Processor Name
-  static constexpr char const* ProcessorName = "ListenHTTP";
+  static constexpr char const *ProcessorName = "ListenHTTP";
   // Supported Properties
   static core::Property BasePath;
   static core::Property Port;
@@ -70,23 +71,80 @@ class ListenHTTP : public core::Processor {
   void initialize();
   void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory);
 
+  struct response_body {
+    std::string uri;
+    std::string mime_type;
+    std::string body;
+  };
+
   // HTTP request handler
   class Handler : public CivetHandler {
    public:
-    Handler(core::ProcessContext *context, core::ProcessSessionFactory 
*sessionFactory, std::string &&authDNPattern, std::string 
&&headersAsAttributesPattern);
+    Handler(std::string base_uri,
+            core::ProcessContext *context,
+            core::ProcessSessionFactory *sessionFactory,
+            std::string &&authDNPattern,
+            std::string &&headersAsAttributesPattern);
     bool handlePost(CivetServer *server, struct mg_connection *conn);
+    bool handleGet(CivetServer *server, struct mg_connection *conn);
+
+    /**
+     * Sets a static response body string to be used for a given URI, with a 
number of seconds it will be kept in memory.
+     * @param response
+     */
+    void set_response_body(struct response_body response) {
+      std::lock_guard<std::mutex> guard(uri_map_mutex_);
+
+      if (response.body.empty()) {
+        logger_->log_info("Unregistering response body for URI '%s'",
+                          response.uri);
+        response_uri_map_.erase(response.uri);
+      } else {
+        logger_->log_info("Registering response body for URI '%s' of length 
%lu",
+                          response.uri,
+                          response.body.size());
+        response_uri_map_[response.uri] = std::move(response);
+      }
+    }
 
    private:
     // Send HTTP 500 error response to client
-    void sendErrorResponse(struct mg_connection *conn);
+    void send_error_response(struct mg_connection *conn);
+    bool auth_request(mg_connection *conn, const mg_request_info *req_info) 
const;
+    void set_header_attributes(const mg_request_info *req_info, const 
std::shared_ptr<FlowFileRecord> &flow_file) const;
+    void write_body(mg_connection *conn, const mg_request_info *req_info);
 
-    std::regex _authDNRegex;
-    std::regex _headersAsAttributesRegex;
-    core::ProcessContext *_processContext;
-    core::ProcessSessionFactory *_processSessionFactory;
+    std::string base_uri_;
+    std::regex auth_dn_regex_;
+    std::regex headers_as_attrs_regex_;
+    core::ProcessContext *process_context_;
+    core::ProcessSessionFactory *session_factory_;
 
     // Logger
     std::shared_ptr<logging::Logger> logger_;
+    std::map<std::string, response_body> response_uri_map_;
+    std::mutex uri_map_mutex_;
+  };
+
+  class ResponseBodyReadCallback : public InputStreamCallback {
+   public:
+    explicit ResponseBodyReadCallback(std::string *out_str)
+        : out_str_(out_str) {
+    }
+    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      out_str_->resize(stream->getSize());
+      auto num_read = stream->readData(reinterpret_cast<uint8_t 
*>(&(*out_str_)[0]),
+                                       static_cast<int>(stream->getSize()));
+
+      if (num_read != stream->getSize()) {
+        throw std::runtime_error("GraphReadCallback failed to fully read flow 
file input stream");
+      }
+
+      return num_read;
+    }
+
+   private:
+    std::string *out_str_;
   };
 
   // Write callback for transferring data from HTTP request to content repo
@@ -99,16 +157,16 @@ class ListenHTTP : public core::Processor {
     // Logger
     std::shared_ptr<logging::Logger> logger_;
 
-    struct mg_connection *_conn;
-    const struct mg_request_info *_reqInfo;
+    struct mg_connection *conn_;
+    const struct mg_request_info *req_info_;
   };
 
  private:
   // Logger
   std::shared_ptr<logging::Logger> logger_;
 
-  std::unique_ptr<CivetServer> _server;
-  std::unique_ptr<Handler> _handler;
+  std::unique_ptr<CivetServer> server_;
+  std::unique_ptr<Handler> handler_;
 };
 
 REGISTER_RESOURCE(ListenHTTP);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/libminifi/test/civetweb-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/libminifi/test/civetweb-tests/CMakeLists.txt 
b/libminifi/test/civetweb-tests/CMakeLists.txt
new file mode 100644
index 0000000..eb0fe27
--- /dev/null
+++ b/libminifi/test/civetweb-tests/CMakeLists.txt
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB CIVETWEB_INTEGRATION_TESTS "*.cpp")
+SET(CIVETWEB-EXTENSIONS_TEST_COUNT 0)
+FOREACH(testfile ${CIVETWEB_INTEGRATION_TESTS})
+  get_filename_component(testfilename "${testfile}" NAME_WE)
+  add_executable("${testfilename}" "${testfile}")
+  target_include_directories(${testfilename} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/extensions/civetweb")
+  target_include_directories(${testfilename} PRIVATE BEFORE 
"${CMAKE_SOURCE_DIR}/thirdparty/civetweb-1.10/include")
+
+  if (APPLE)
+    target_link_libraries (${testfilename} -Wl,-all_load 
minifi-civet-extensions minifi-http-curl)
+  else ()
+    target_link_libraries (${testfilename} -Wl,--whole-archive 
minifi-civet-extensions minifi-http-curl -Wl,--no-whole-archive)
+  endif ()
+
+  createTests("${testfilename}")
+  target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+  MATH(EXPR CIVETWEB-EXTENSIONS_TEST_COUNT 
"${CIVETWEB-EXTENSIONS_TEST_COUNT}+1")
+  add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY 
${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${CIVETWEB-EXTENSIONS_TEST_COUNT} civetweb 
related test file(s)...")

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/0b85b698/libminifi/test/civetweb-tests/CivetwebTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/civetweb-tests/CivetwebTests.cpp 
b/libminifi/test/civetweb-tests/CivetwebTests.cpp
new file mode 100644
index 0000000..8971e64
--- /dev/null
+++ b/libminifi/test/civetweb-tests/CivetwebTests.cpp
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <uuid/uuid.h>
+#include <fstream>
+#include <map>
+#include <memory>
+#include <set>
+#include <iostream>
+#include <GenerateFlowFile.h>
+#include <UpdateAttribute.h>
+#include <LogAttribute.h>
+#include <processors/ListenHTTP.h>
+
+#include "../TestBase.h"
+
+#include "processors/GetFile.h"
+#include "processors/PutFile.h"
+#include "../../../extensions/http-curl/client/HTTPClient.h"
+
+TEST_CASE("Test Creation of ListenHTTP", "[ListenHTTPreate]") {  // NOLINT
+  TestController testController;
+  std::shared_ptr<core::Processor>
+      processor = 
std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("processorname");
+  REQUIRE(processor->getName() == "processorname");
+}
+
+TEST_CASE("Test GET Body", "[ListenHTTPGETBody]") {  // NOLINT
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::GenerateFlowFile>();
+  LogTestController::getInstance().setTrace<processors::UpdateAttribute>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<processors::ListenHTTP>();
+  LogTestController::getInstance().setTrace<processors::ListenHTTP::Handler>();
+
+  auto plan = testController.createPlan();
+  auto repo = std::make_shared<TestRepository>();
+
+  // Define directory for test input
+  std::string test_in_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&test_in_dir[0]) != nullptr);
+
+  // Define test input file
+  std::string test_input_file(test_in_dir);
+  test_input_file.append("/test");
+  {
+    std::ofstream os(test_input_file);
+    os << "Hello response body" << std::endl;
+  }
+
+  // Build MiNiFi processing graph
+  auto get = plan->addProcessor(
+      "GetFile",
+      "Get");
+  plan->setProperty(
+      get,
+      "Input Directory",
+      test_in_dir);
+  auto update = plan->addProcessor(
+      "UpdateAttribute",
+      "Update",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      update,
+      "http.type",
+      "response_body",
+      true);
+  auto log = plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+  auto listen = plan->addProcessor(
+      "ListenHTTP",
+      "ListenHTTP",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      listen,
+      "Listening Port",
+      "8888");
+  listen->setAutoTerminatedRelationships({{"success", ""}});
+
+  plan->runNextProcessor();  // Get
+  plan->runNextProcessor();  // Update
+  plan->runNextProcessor();  // Log
+  plan->runNextProcessor();  // Listen
+
+  sleep(1);
+  utils::HTTPClient client("http://localhost:8888/contentListener/test";);
+  REQUIRE(client.submit());
+  const auto &body_chars = client.getResponseBody();
+  std::string response_body(body_chars.data(), body_chars.size());
+  REQUIRE("Hello response body\n" == response_body);
+}

Reply via email to