szaszm commented on a change in pull request #921: URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r503353221
########## File path: extensions/civetweb/processors/ListenHTTP.cpp ########## @@ -434,16 +456,11 @@ void ListenHTTP::Handler::write_body(mg_connection *conn, const mg_request_info } } -ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo) - : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) { - conn_ = conn; - req_info_ = reqInfo; -} - -int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) { +std::shared_ptr<io::BufferStream> ListenHTTP::Handler::createContentBuffer(struct mg_connection *conn, const struct mg_request_info *req_info) { + auto content_buffer = std::make_shared<io::BufferStream>(); Review comment: I don't think we need any sharing here, only a few ownership transfers: createContentBuffer -> handlePost (temporary) -> enqueueRequest -> request buffer -> onTrigger ```suggestion auto content_buffer = utils::make_unique<io::BufferStream>(); ``` ########## File path: extensions/civetweb/processors/ListenHTTP.cpp ########## @@ -273,6 +325,34 @@ void ListenHTTP::Handler::set_header_attributes(const mg_request_info *req_info, } } +bool ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const mg_request_info *req_info, std::shared_ptr<io::BufferStream> content_buffer) { + auto flow_file = std::make_shared<FlowFileRecord>(); + auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier(); + if (flow_version != nullptr) { + flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId()); + } + + if (!flow_file) { + sendHttp500(conn); + return true; + } + + setHeaderAttributes(req_info, flow_file); + + if (buffer_size_ == 0 || request_buffer.size() < buffer_size_) { + request_buffer.enqueue(std::make_pair(std::move(flow_file), std::move(content_buffer))); + } else { + logger_->log_error("ListenHTTP buffer is full"); Review comment: I would only log a warning, because the other endpoint can always choose to retry later in response to HTTP 503, leaving the system functional. ########## File path: extensions/civetweb/processors/ListenHTTP.cpp ########## @@ -212,51 +233,82 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF ListenHTTP::~ListenHTTP() = default; void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr<core::FlowFile> flow_file = session->get(); + logger_->log_debug("OnTrigger ListenHTTP"); + processIncomingFlowFile(session); + processRequestBuffer(session); +} - // Do nothing if there are no incoming files +void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) { + std::shared_ptr<core::FlowFile> flow_file = session->get(); if (!flow_file) { return; } std::string type; flow_file->getAttribute("http.type", type); - if (type == "response_body") { - - if (handler_) { - struct response_body response { "", "", "" }; - ResponseBodyReadCallback cb(&response.body); - flow_file->getAttribute("filename", response.uri); - flow_file->getAttribute("mime.type", response.mime_type); - if (response.mime_type.empty()) { - logger_->log_warn("Using default mime type of application/octet-stream for response body file: %s", response.uri); - response.mime_type = "application/octet-stream"; - } - session->read(flow_file, &cb); - handler_->set_response_body(std::move(response)); + if (type == "response_body" && handler_) { + response_body response; + ResponseBodyReadCallback cb(&response.body); + flow_file->getAttribute("filename", response.uri); + flow_file->getAttribute("mime.type", response.mime_type); + if (response.mime_type.empty()) { + logger_->log_warn("Using default mime type of application/octet-stream for response body file: %s", response.uri); + response.mime_type = "application/octet-stream"; } + session->read(flow_file, &cb); + handler_->setResponseBody(std::move(response)); } session->remove(flow_file); } -ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext *context, core::ProcessSessionFactory *session_factory, std::string &&auth_dn_regex, std::string &&header_as_attrs_regex) +void ListenHTTP::processRequestBuffer(core::ProcessSession *session) { + std::size_t flow_file_count = 0; + while (batch_size_ == 0 || batch_size_ > flow_file_count) { Review comment: ```suggestion for (std::size_t flow_file_count = 0; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count) { ``` ########## File path: extensions/civetweb/processors/ListenHTTP.cpp ########## @@ -62,6 +62,17 @@ core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as " should be passed along as FlowFile attributes", ""); +core::Property ListenHTTP::BatchSize( + core::PropertyBuilder::createProperty("Batch Size") + ->withDescription("Maximum number of buffered requests to be processed in a single batch. If set to zero all buffered requests are processed.") + ->withDefaultValue<std::size_t>(0)->build()); + +core::Property ListenHTTP::BufferSize( + core::PropertyBuilder::createProperty("Buffer Size") + ->withDescription("Maximum number of HTTP Requests allowed to be buffered before processing them when the processor is triggered. " + "If the buffer full, the request is refused. If set to zero the buffer is unlimited.") + ->withDefaultValue<std::size_t>(0)->build()); + Review comment: I think a buffer size somewhere in the ballpark of 20k would be a reasonable compromise between supporting most use cases without rejecting requests (assuming a common 1/sec scheduling) and not using up all of the system's memory in case of insufficient throughput and flow file accumulation. I'd set batch size to the same so that we can process all of the buffered requests in one batch by default. ########## File path: extensions/civetweb/processors/ListenHTTP.h ########## @@ -43,15 +43,17 @@ namespace processors { class ListenHTTP : public core::Processor { public: + using FlowFileBufferPair=std::pair<std::shared_ptr<core::FlowFile>, std::shared_ptr<io::BufferStream>>; Review comment: I usually prefer to keep the types as specific as possible to avoid issues down the road as upcasting is always easier and safer than downcasting. Here, this means using FlowFileRecord instead of FlowFile. ```suggestion using FlowFileBufferPair=std::pair<std::shared_ptr<FlowFileRecord>, std::shared_ptr<io::BufferStream>>; ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org