lordgamez commented on a change in pull request #921:
URL: https://github.com/apache/nifi-minifi-cpp/pull/921#discussion_r504052023
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -191,7 +206,15 @@ void ListenHTTP::onSchedule(core::ProcessContext *context,
core::ProcessSessionF
}
server_.reset(new CivetServer(options, &callbacks_, &logger_));
- handler_.reset(new Handler(basePath, context, sessionFactory,
std::move(authDNPattern), std::move(headersAsAttributesPattern)));
+
+ context->getProperty(BatchSize.getName(), batch_size_);
+ logger_->log_debug("ListenHTTP using %s: %d", BatchSize.getName(),
batch_size_);
+
+ std::size_t buffer_size;
+ context->getProperty(BufferSize.getName(), buffer_size);
+ logger_->log_debug("ListenHTTP using %s: %d", BufferSize.getName(),
buffer_size);
+
+ handler_.reset(new Handler(basePath, context, std::move(authDNPattern),
std::move(headersAsAttributesPattern), buffer_size));
Review comment:
Good point, fixed in
[0427909](https://github.com/apache/nifi-minifi-cpp/pull/921/commits/0427909155e707881d8640ca20e1c33b0c9d84f5)
##########
File path: extensions/civetweb/processors/ListenHTTP.cpp
##########
@@ -212,51 +235,80 @@ void ListenHTTP::onSchedule(core::ProcessContext
*context, core::ProcessSessionF
ListenHTTP::~ListenHTTP() = default;
void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession
*session) {
- std::shared_ptr<core::FlowFile> flow_file = session->get();
+ logger_->log_debug("OnTrigger ListenHTTP");
+ processIncomingFlowFile(session);
+ processRequestBuffer(session);
+}
- // Do nothing if there are no incoming files
+void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) {
+ std::shared_ptr<core::FlowFile> flow_file = session->get();
if (!flow_file) {
return;
}
std::string type;
flow_file->getAttribute("http.type", type);
- if (type == "response_body") {
-
- if (handler_) {
- struct response_body response { "", "", "" };
- ResponseBodyReadCallback cb(&response.body);
- flow_file->getAttribute("filename", response.uri);
- flow_file->getAttribute("mime.type", response.mime_type);
- if (response.mime_type.empty()) {
- logger_->log_warn("Using default mime type of application/octet-stream
for response body file: %s", response.uri);
- response.mime_type = "application/octet-stream";
- }
- session->read(flow_file, &cb);
- handler_->set_response_body(std::move(response));
+ if (type == "response_body" && handler_) {
+ response_body response;
+ ResponseBodyReadCallback cb(&response.body);
+ flow_file->getAttribute("filename", response.uri);
+ flow_file->getAttribute("mime.type", response.mime_type);
+ if (response.mime_type.empty()) {
+ logger_->log_warn("Using default mime type of application/octet-stream
for response body file: %s", response.uri);
+ response.mime_type = "application/octet-stream";
}
+ session->read(flow_file, &cb);
+ handler_->setResponseBody(std::move(response));
}
session->remove(flow_file);
}
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext
*context, core::ProcessSessionFactory *session_factory, std::string
&&auth_dn_regex, std::string &&header_as_attrs_regex)
+void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
+ std::size_t flow_file_count = 0;
+ for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count)
{
+ FlowFileBufferPair flow_file_buffer_pair;
+ if (!handler_->request_buffer.tryDequeue(flow_file_buffer_pair)) {
+ break;
Review comment:
This can happen only if the queue becomes empty before we reach batch
size. It is a normal use case and we log the number of flow files we dequeued
after the loop so I think it shouldn't be needed to additionally log here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]