szaszm commented on code in PR #1826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1826#discussion_r1803050660


##########
extensions/civetweb/processors/ListenHTTP.cpp:
##########
@@ -191,52 +210,95 @@ bool 
ListenHTTP::processIncomingFlowFile(core::ProcessSession &session) {
   std::string type;
   flow_file->getAttribute("http.type", type);
 
-  if (type == "response_body" && handler_) {
-    ResponseBody response;
-    flow_file->getAttribute("filename", response.uri);
-    flow_file->getAttribute("mime.type", response.mime_type);
-    if (response.mime_type.empty()) {
-      logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: {}", response.uri);
-      response.mime_type = "application/octet-stream";
-    }
-    response.body = session.readBuffer(flow_file).buffer;
-    handler_->setResponseBody(response);
+  if (type == "response_body" && handler_ && processFlowFile(flow_file)) {
+    session.transfer(flow_file, Self);
+  } else {
+    session.remove(flow_file);
   }
 
-  session.remove(flow_file);
   return true;
 }
 
+bool ListenHTTP::processFlowFile(const std::shared_ptr<core::FlowFile>& 
flow_file) {
+  ResponseBody response;
+  flow_file->getAttribute("filename", response.uri);
+  flow_file->getAttribute("mime.type", response.mime_type);
+  if (response.mime_type.empty()) {
+    logger_->log_warn("Using default mime type of application/octet-stream for 
response body file: {}", response.uri);
+    response.mime_type = "application/octet-stream";
+  }
+
+  response.flow_file = flow_file;
+  return handler_->setResponseBody(response);
+}
+
 /// @return Whether there was a request processed
 bool ListenHTTP::processRequestBuffer(core::ProcessSession& session) {
   gsl_Expects(handler_);
   std::size_t flow_file_count = 0;
   for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count) 
{
-    FlowFileBufferPair flow_file_buffer_pair;
-    if (!handler_->dequeueRequest(flow_file_buffer_pair)) {
+    Handler::Request req;
+    if (!handler_->dequeueRequest(req)) {
       break;
     }
 
-    auto flow_file = flow_file_buffer_pair.first;
-    session.add(flow_file);
-
-    if (flow_file_buffer_pair.second) {
-      session.writeBuffer(flow_file, 
flow_file_buffer_pair.second->getBuffer());
-    }
-
-    session.transfer(flow_file, Success);
+    [&] {
+      std::promise<void> req_done_promise;
+      auto res = req_done_promise.get_future();
+      req.set_value(Handler::RequestValue{std::ref(session), 
std::move(req_done_promise)});
+      return res;
+    }().wait();
   }
 
   logger_->log_debug("ListenHTTP transferred {} flow files from HTTP request 
buffer", flow_file_count);
   return flow_file_count > 0;
 }
 
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext 
*context, std::string &&auth_dn_regex, std::optional<utils::Regex> 
&&headers_as_attrs_regex)
+namespace {
+
+class MgConnectionInputStream : public io::InputStream {
+ public:
+  MgConnectionInputStream(struct mg_connection* conn, std::optional<size_t> 
size): conn_(conn), size_(size) {}
+
+  size_t read(std::span<std::byte> out_buffer) override {
+    const auto mg_read_return = mg_read(conn_, out_buffer.data(), 
std::min(out_buffer.size(), size_.value_or(std::numeric_limits<size_t>::max()) 
- offset_));
+    if (mg_read_return < 0) {
+      return io::STREAM_ERROR;
+    }
+    offset_ += gsl::narrow<size_t>(mg_read_return);
+    return gsl::narrow<size_t>(mg_read_return);
+  }
+
+ private:
+  struct mg_connection* conn_;
+  size_t offset_{0};
+  std::optional<size_t> size_;

Review Comment:
   I found these a bit confusing, whose offsets are these. I realized that the 
network stream, so I'd rename them.
   ```suggestion
     size_t netstream_offset_{0};  // how much has been read from conn_
     std::optional<size_t> netstream_size_limit_;  // how much can we read from 
conn_
   ```



##########
extensions/civetweb/processors/ListenHTTP.cpp:
##########
@@ -191,52 +210,95 @@ bool 
ListenHTTP::processIncomingFlowFile(core::ProcessSession &session) {
   std::string type;
   flow_file->getAttribute("http.type", type);
 
-  if (type == "response_body" && handler_) {
-    ResponseBody response;
-    flow_file->getAttribute("filename", response.uri);
-    flow_file->getAttribute("mime.type", response.mime_type);
-    if (response.mime_type.empty()) {
-      logger_->log_warn("Using default mime type of application/octet-stream 
for response body file: {}", response.uri);
-      response.mime_type = "application/octet-stream";
-    }
-    response.body = session.readBuffer(flow_file).buffer;
-    handler_->setResponseBody(response);
+  if (type == "response_body" && handler_ && processFlowFile(flow_file)) {
+    session.transfer(flow_file, Self);
+  } else {
+    session.remove(flow_file);
   }
 
-  session.remove(flow_file);
   return true;
 }
 
+bool ListenHTTP::processFlowFile(const std::shared_ptr<core::FlowFile>& 
flow_file) {
+  ResponseBody response;
+  flow_file->getAttribute("filename", response.uri);
+  flow_file->getAttribute("mime.type", response.mime_type);
+  if (response.mime_type.empty()) {
+    logger_->log_warn("Using default mime type of application/octet-stream for 
response body file: {}", response.uri);
+    response.mime_type = "application/octet-stream";
+  }
+
+  response.flow_file = flow_file;
+  return handler_->setResponseBody(response);
+}
+
 /// @return Whether there was a request processed
 bool ListenHTTP::processRequestBuffer(core::ProcessSession& session) {
   gsl_Expects(handler_);
   std::size_t flow_file_count = 0;
   for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count) 
{
-    FlowFileBufferPair flow_file_buffer_pair;
-    if (!handler_->dequeueRequest(flow_file_buffer_pair)) {
+    Handler::Request req;
+    if (!handler_->dequeueRequest(req)) {
       break;
     }
 
-    auto flow_file = flow_file_buffer_pair.first;
-    session.add(flow_file);
-
-    if (flow_file_buffer_pair.second) {
-      session.writeBuffer(flow_file, 
flow_file_buffer_pair.second->getBuffer());
-    }
-
-    session.transfer(flow_file, Success);
+    [&] {
+      std::promise<void> req_done_promise;
+      auto res = req_done_promise.get_future();
+      req.set_value(Handler::RequestValue{std::ref(session), 
std::move(req_done_promise)});
+      return res;
+    }().wait();
   }
 
   logger_->log_debug("ListenHTTP transferred {} flow files from HTTP request 
buffer", flow_file_count);
   return flow_file_count > 0;
 }
 
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext 
*context, std::string &&auth_dn_regex, std::optional<utils::Regex> 
&&headers_as_attrs_regex)
+namespace {
+
+class MgConnectionInputStream : public io::InputStream {
+ public:
+  MgConnectionInputStream(struct mg_connection* conn, std::optional<size_t> 
size): conn_(conn), size_(size) {}
+
+  size_t read(std::span<std::byte> out_buffer) override {
+    const auto mg_read_return = mg_read(conn_, out_buffer.data(), 
std::min(out_buffer.size(), size_.value_or(std::numeric_limits<size_t>::max()) 
- offset_));
+    if (mg_read_return < 0) {
+      return io::STREAM_ERROR;
+    }
+    offset_ += gsl::narrow<size_t>(mg_read_return);

Review Comment:
   ```suggestion
       const auto read_size_limit = 
netstream_size_limit_.value_or(std::numeric_limits<size_t>::max()) - 
netstream_offset_;
       const auto limited_out_buf = out_buffer.subspan(0, 
std::min(out_buffer.size(), read_size_limit);
       const auto mg_read_return = mg_read(conn_, limited_out_buf.data(), 
limited_out_buf.size());
       if (mg_read_return < 0) {
         return io::STREAM_ERROR;
       }
       offset_ += gsl::narrow<size_t>(mg_read_return);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to