szaszm commented on code in PR #1826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1826#discussion_r1865786532
##########
extensions/civetweb/processors/ListenHTTP.cpp:
##########
@@ -269,25 +331,48 @@ void ListenHTTP::Handler::setHeaderAttributes(const
mg_request_info *req_info, c
}
}
-void ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const
mg_request_info *req_info, std::unique_ptr<io::BufferStream> content_buffer) {
- auto flow_file = std::make_shared<FlowFileRecord>();
- auto flow_version =
process_context_->getProcessorNode()->getFlowIdentifier();
- if (flow_version != nullptr) {
- flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID,
flow_version->getFlowId());
+void ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const
mg_request_info *req_info, bool write_body) {
+ if (buffer_size_ != 0 && request_buffer_.size() >= buffer_size_) {
+ logger_->log_warn("ListenHTTP buffer is full, '{}' request for '{}' uri
was dropped", req_info->request_method, req_info->request_uri);
+ sendHttp503(conn);
+ return;
+ } else {
+ logger_->log_warn("ListenHTTP buffer is NOT full {}/{}, '{}' request for
'{}' uri was dropped", request_buffer_.size() + 1, buffer_size_,
req_info->request_method, req_info->request_uri);
}
- setHeaderAttributes(req_info, *flow_file);
+ Request req;
+ auto req_triggered = req.get_future();
- if (buffer_size_ == 0 || request_buffer_.size() < buffer_size_) {
- request_buffer_.enqueue(std::make_pair(std::move(flow_file),
std::move(content_buffer)));
- } else {
- logger_->log_warn("ListenHTTP buffer is full, '{}' request for '{}' uri
was dropped", req_info->request_method, req_info->request_uri);
+ request_buffer_.enqueue(std::move(req));
+
+ auto req_result = req_triggered.get();
+ if (!req_result) {
sendHttp503(conn);
+ req_result.error().ret.set_value();
return;
}
+ auto& [session, req_done] = *req_result;
+
+ auto flow_file = session.get().create();
+ if (flow_id_) {
+ flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID,
flow_id_.value());
+ }
+
+ if (write_body) {
+ session.get().write(flow_file, [&] (auto& out) {
+ MgConnectionInputStream mg_body{conn, req_info->content_length};
Review Comment:
```suggestion
std::optional<size_t> request_size = std::nullopt;
if (req_info->content_length > 0) { request_size =
gsl::narrow<size_t>(req_info->content_length); }
MgConnectionInputStream mg_body{conn, request_size};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]