szaszm commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r503353221



##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -434,16 +456,11 @@ void ListenHTTP::Handler::write_body(mg_connection *conn, 
const mg_request_info
   }
 }
 
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const 
struct mg_request_info *reqInfo)
-    : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
-  conn_ = conn;
-  req_info_ = reqInfo;
-}
-
-int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> 
stream) {
+std::shared_ptr<io::BufferStream> 
ListenHTTP::Handler::createContentBuffer(struct mg_connection *conn, const 
struct mg_request_info *req_info) {
+  auto content_buffer = std::make_shared<io::BufferStream>();

Review comment:
       I don't think we need any sharing here, only a few ownership transfers:
   createContentBuffer -> handlePost (temporary) -> enqueueRequest -> request 
buffer -> onTrigger
   
   
   ```suggestion
     auto content_buffer = utils::make_unique<io::BufferStream>();
   ```
   

##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const 
mg_request_info *req_info,
   }
 }
 
+bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const 
mg_request_info *req_info, std::shared_ptr<io::BufferStream> content_buffer) {
+  auto flow_file = std::make_shared<FlowFileRecord>();
+  auto flow_version = 
process_context_->getProcessorNode()->getFlowIdentifier();
+  if (flow_version != nullptr) {
+    flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID, 
flow_version->getFlowId());
+  }
+
+  if (!flow_file) {
+    sendHttp500(conn);
+    return true;
+  }
+
+  setHeaderAttributes(req_info, flow_file);
+
+  if (buffer_size_ == 0 || request_buffer.size() < buffer_size_) {
+    request_buffer.enqueue(std::make_pair(std::move(flow_file), 
std::move(content_buffer)));
+  } else {
+    logger_->log_error("ListenHTTP buffer is full");

Review comment:
       I would only log a warning, because the other endpoint can always choose 
to retry later in response to HTTP 503, leaving the system functional.

##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -212,51 +233,82 @@ void ListenHTTP::onSchedule(core::ProcessContext 
*context, core::ProcessSessionF
 ListenHTTP::~ListenHTTP() = default;
 
 void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession 
*session) {
-  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  logger_->log_debug("OnTrigger ListenHTTP");
+  processIncomingFlowFile(session);
+  processRequestBuffer(session);
+}
 
-  // Do nothing if there are no incoming files
+void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) {
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
   if (!flow_file) {
     return;
   }
 
   std::string type;
   flow_file->getAttribute("http.type", type);
 
-  if (type == "response_body") {
-
-    if (handler_) {
-      struct response_body response { "", "", "" };
-      ResponseBodyReadCallback cb(&response.body);
-      flow_file->getAttribute("filename", response.uri);
-      flow_file->getAttribute("mime.type", response.mime_type);
-      if (response.mime_type.empty()) {
-        logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s", response.uri);
-        response.mime_type = "application/octet-stream";
-      }
-      session->read(flow_file, &cb);
-      handler_->set_response_body(std::move(response));
+  if (type == "response_body" && handler_) {
+    response_body response;
+    ResponseBodyReadCallback cb(&response.body);
+    flow_file->getAttribute("filename", response.uri);
+    flow_file->getAttribute("mime.type", response.mime_type);
+    if (response.mime_type.empty()) {
+      logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: %s", response.uri);
+      response.mime_type = "application/octet-stream";
     }
+    session->read(flow_file, &cb);
+    handler_->setResponseBody(std::move(response));
   }
 
   session->remove(flow_file);
 }
 
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext 
*context, core::ProcessSessionFactory *session_factory, std::string 
&&auth_dn_regex, std::string &&header_as_attrs_regex)
+void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
+  std::size_t flow_file_count = 0;
+  while (batch_size_ == 0 || batch_size_ > flow_file_count) {

Review comment:
       ```suggestion
     for (std::size_t flow_file_count = 0; batch_size_ == 0 || batch_size_ > 
flow_file_count; ++flow_file_count) {
   ```

##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -62,6 +62,17 @@ core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP 
Headers to receive as
                                                     " should be passed along 
as FlowFile attributes",
                                                     "");
 
+core::Property ListenHTTP::BatchSize(
+    core::PropertyBuilder::createProperty("Batch Size")
+        ->withDescription("Maximum number of buffered requests to be processed 
in a single batch. If set to zero all buffered requests are processed.")
+        ->withDefaultValue<std::size_t>(0)->build());
+
+core::Property ListenHTTP::BufferSize(
+    core::PropertyBuilder::createProperty("Buffer Size")
+        ->withDescription("Maximum number of HTTP Requests allowed to be 
buffered before processing them when the processor is triggered. "
+                          "If the buffer full, the request is refused. If set 
to zero the buffer is unlimited.")
+        ->withDefaultValue<std::size_t>(0)->build());
+

Review comment:
       I think a buffer size somewhere in the ballpark of 20k would be a 
reasonable compromise between supporting most use cases without rejecting 
requests (assuming a common 1/sec scheduling) and not using up all of the 
system's memory in case of insufficient throughput and flow file accumulation. 
I'd set batch size to the same so that we can process all of the buffered 
requests in one batch by default.

##########
File path: extensions/civetweb/processors/ListenHTTP.h
##########
@@ -43,15 +43,17 @@ namespace processors {
 class ListenHTTP : public core::Processor {
  public:
 
+  using FlowFileBufferPair=std::pair<std::shared_ptr<core::FlowFile>, 
std::shared_ptr<io::BufferStream>>;

Review comment:
       I usually prefer to keep the types as specific as possible to avoid 
issues down the road as upcasting is always easier and safer than downcasting. 
Here, this means using FlowFileRecord instead of FlowFile.
   ```suggestion
     using FlowFileBufferPair=std::pair<std::shared_ptr<FlowFileRecord>, 
std::shared_ptr<io::BufferStream>>;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to