lordgamez commented on code in PR #1826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1826#discussion_r1862389269
##########
extensions/civetweb/processors/ListenHTTP.cpp:
##########
@@ -191,52 +208,97 @@ bool
ListenHTTP::processIncomingFlowFile(core::ProcessSession &session) {
std::string type;
flow_file->getAttribute("http.type", type);
- if (type == "response_body" && handler_) {
- ResponseBody response;
- flow_file->getAttribute("filename", response.uri);
- flow_file->getAttribute("mime.type", response.mime_type);
- if (response.mime_type.empty()) {
- logger_->log_warn("Using default mime type of application/octet-stream
for response body file: {}", response.uri);
- response.mime_type = "application/octet-stream";
- }
- response.body = session.readBuffer(flow_file).buffer;
- handler_->setResponseBody(response);
+ if (type == "response_body" && handler_ && processFlowFile(flow_file)) {
+ session.transfer(flow_file, Self);
+ } else {
+ session.remove(flow_file);
}
- session.remove(flow_file);
return true;
}
+bool ListenHTTP::processFlowFile(const std::shared_ptr<core::FlowFile>&
flow_file) {
+ ResponseBody response;
+ flow_file->getAttribute("filename", response.uri);
+ flow_file->getAttribute("mime.type", response.mime_type);
+ if (response.mime_type.empty()) {
+ logger_->log_warn("Using default mime type of application/octet-stream for
response body file: {}", response.uri);
+ response.mime_type = "application/octet-stream";
+ }
+
+ response.flow_file = flow_file;
+ return handler_->setResponseBody(response);
+}
+
/// @return Whether there was a request processed
bool ListenHTTP::processRequestBuffer(core::ProcessSession& session) {
gsl_Expects(handler_);
std::size_t flow_file_count = 0;
for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count)
{
- FlowFileBufferPair flow_file_buffer_pair;
- if (!handler_->dequeueRequest(flow_file_buffer_pair)) {
+ Handler::Request req;
+ if (!handler_->dequeueRequest(req)) {
break;
}
- auto flow_file = flow_file_buffer_pair.first;
- session.add(flow_file);
-
- if (flow_file_buffer_pair.second) {
- session.writeBuffer(flow_file,
flow_file_buffer_pair.second->getBuffer());
- }
-
- session.transfer(flow_file, Success);
+ [&] {
+ std::promise<void> req_done_promise;
+ auto res = req_done_promise.get_future();
+ req.set_value(Handler::RequestValue{std::ref(session),
std::move(req_done_promise)});
+ return res;
+ }().wait();
}
logger_->log_debug("ListenHTTP transferred {} flow files from HTTP request
buffer", flow_file_count);
return flow_file_count > 0;
}
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext
*context, std::string &&auth_dn_regex, std::optional<utils::Regex>
&&headers_as_attrs_regex)
+namespace {
+
+class MgConnectionInputStream : public io::InputStream {
+ public:
+ MgConnectionInputStream(struct mg_connection* conn, std::optional<size_t>
size): conn_(conn), netstream_size_limit_(size) {}
+
+ size_t read(std::span<std::byte> out_buffer) override {
+ const auto read_size_limit =
netstream_size_limit_.value_or(std::numeric_limits<size_t>::max()) -
netstream_offset_;
+ const auto limited_out_buf = out_buffer.subspan(0,
std::min(out_buffer.size(), read_size_limit));
+ const auto mg_read_return = mg_read(conn_, limited_out_buf.data(),
limited_out_buf.size());
+ if (mg_read_return <= 0) {
+ return 0;
+ }
+ netstream_offset_ += gsl::narrow<size_t>(mg_read_return);
+ return gsl::narrow<size_t>(mg_read_return);
+ }
+
+ private:
+ struct mg_connection* conn_;
+ size_t netstream_offset_{0}; // how much has been read from conn_
+ std::optional<size_t> netstream_size_limit_; // how much can we read from
conn_
Review Comment:
Should this be optional? It seems that we always have a specific message
size when using this input stream for message processing.
##########
extensions/civetweb/processors/ListenHTTP.h:
##########
@@ -165,28 +192,66 @@ class ListenHTTP : public core::Processor {
* Sets a static response body string to be used for a given URI, with a
number of seconds it will be kept in memory.
* @param response
*/
- void setResponseBody(const ResponseBody& response);
+ bool setResponseBody(const ResponseBody& response);
+
+ bool dequeueRequest(Request& req);
+
+ size_t requestCount() const {
+ return request_buffer_.size();
+ }
+
+ bool empty() const {
+ return request_buffer_.empty();
+ }
- bool dequeueRequest(FlowFileBufferPair &flow_file_buffer_pair);
+ void stop() {
+ request_buffer_.stop();
+ Request req;
+ while (dequeueRequest(req)) {
+ std::promise<void> req_done_promise;
+ auto req_done = req_done_promise.get_future();
+
req.set_value(nonstd::make_unexpected(FailureValue{Handler::FailureReason::PROCESSOR_SHUTDOWN,
std::move(req_done_promise)}));
+ req_done.wait();
+ }
+ }
private:
static void sendHttp500(struct mg_connection *conn);
static void sendHttp503(struct mg_connection *conn);
bool authRequest(mg_connection *conn, const mg_request_info *req_info)
const;
void setHeaderAttributes(const mg_request_info *req_info, core::FlowFile&
flow_file) const;
- void writeBody(mg_connection *conn, const mg_request_info *req_info, bool
include_payload = true);
- static std::unique_ptr<io::BufferStream> createContentBuffer(struct
mg_connection *conn, const struct mg_request_info *req_info);
- void enqueueRequest(mg_connection *conn, const mg_request_info *req_info,
std::unique_ptr<io::BufferStream>);
+ void writeBody(core::ProcessSession* payload_reader, mg_connection *conn,
const mg_request_info *req_info);
+ void enqueueRequest(mg_connection *conn, const mg_request_info *req_info,
bool write_body);
+
+ class RequestBuffer : public utils::ConcurrentQueue<Request> {
Review Comment:
I think we could avoid the inheritance and use a wrapper around the
concurrent queue instead. This way we don't need to modify the ConcurrentQueue
and make it's enqueue method virtual, just have the wrapper as a proxy for the
few methods that are used by the Handler class. We can also replace the
`running_` with and atomic bool variable so no additional locks are needed in
the wrapper.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]