This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 24cfd6575 MINFICPP-2243 ListenHTTP: process requests in onTrigger
24cfd6575 is described below
commit 24cfd6575ec4b6f1ce23fee576a626695b65f65f
Author: Adam Debreceni <[email protected]>
AuthorDate: Wed Dec 4 15:18:11 2024 +0100
MINFICPP-2243 ListenHTTP: process requests in onTrigger
Network calls are now only made on within onTrigger, instead of reading
data on a background thread and storing it in memory. The incoming data
is stored in the kernel buffers before we read it out.
With these changes, we avoid keeping incoming data in memory, and we can
avoid creating flow files without a normal onTrigger session.
Closes #1826 "In ListenHTTP process incoming request only in onTrigger"
Signed-off-by: Marton Szasz <[email protected]>
---
extensions/civetweb/processors/ListenHTTP.cpp | 244 ++++++++++++------
extensions/civetweb/processors/ListenHTTP.h | 77 +++++-
extensions/civetweb/tests/ListenHTTPTests.cpp | 100 +++++---
.../tests/integration/InvokeHTTPTests.cpp | 285 +++++----------------
libminifi/include/http/HTTPClient.h | 4 +
libminifi/include/utils/MinifiConcurrentQueue.h | 2 +
6 files changed, 357 insertions(+), 355 deletions(-)
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp
b/extensions/civetweb/processors/ListenHTTP.cpp
index 9c1a15379..b50e78b57 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -151,7 +151,12 @@ void ListenHTTP::onSchedule(core::ProcessContext& context,
core::ProcessSessionF
context.getProperty(BatchSize, batch_size_);
logger_->log_debug("ListenHTTP using {}: {}", BatchSize.name, batch_size_);
- handler_ = std::make_unique<Handler>(basePath, &context,
std::move(authDNPattern),
+ std::optional<std::string> flow_id;
+ if (auto flow_version = context.getProcessorNode()->getFlowIdentifier()) {
+ flow_id = flow_version->getFlowId();
+ }
+
+ handler_ = std::make_unique<Handler>(basePath, flow_id,
context.getProperty<uint64_t>(BufferSize).value_or(0), std::move(authDNPattern),
headersAsAttributesPattern.empty() ? std::nullopt :
std::make_optional<utils::Regex>(headersAsAttributesPattern));
server_->addHandler(basePath, handler_.get());
@@ -174,13 +179,25 @@ ListenHTTP::~ListenHTTP() = default;
void ListenHTTP::onTrigger(core::ProcessContext&, core::ProcessSession&
session) {
logger_->log_trace("OnTrigger ListenHTTP");
+ bool restored_processed = false;
+ for (auto& ff : file_store_.getNewFlowFiles()) {
+ restored_processed = true;
+ if (!processFlowFile(ff)) {
+ session.remove(ff);
+ }
+ }
const bool incoming_processed = processIncomingFlowFile(session);
const bool request_processed = processRequestBuffer(session);
- if (!incoming_processed && !request_processed) {
+ if (!restored_processed && !incoming_processed && !request_processed) {
yield();
}
}
+void ListenHTTP::restore(const std::shared_ptr<core::FlowFile>& flowFile) {
+ if (!flowFile) return;
+ file_store_.put(flowFile);
+}
+
/// @return Whether there was a flow file processed.
bool ListenHTTP::processIncomingFlowFile(core::ProcessSession &session) {
std::shared_ptr<core::FlowFile> flow_file = session.get();
@@ -191,52 +208,97 @@ bool
ListenHTTP::processIncomingFlowFile(core::ProcessSession &session) {
std::string type;
flow_file->getAttribute("http.type", type);
- if (type == "response_body" && handler_) {
- ResponseBody response;
- flow_file->getAttribute("filename", response.uri);
- flow_file->getAttribute("mime.type", response.mime_type);
- if (response.mime_type.empty()) {
- logger_->log_warn("Using default mime type of application/octet-stream
for response body file: {}", response.uri);
- response.mime_type = "application/octet-stream";
- }
- response.body = session.readBuffer(flow_file).buffer;
- handler_->setResponseBody(response);
+ if (type == "response_body" && handler_ && processFlowFile(flow_file)) {
+ session.transfer(flow_file, Self);
+ } else {
+ session.remove(flow_file);
}
- session.remove(flow_file);
return true;
}
+bool ListenHTTP::processFlowFile(const std::shared_ptr<core::FlowFile>&
flow_file) {
+ ResponseBody response;
+ flow_file->getAttribute("filename", response.uri);
+ flow_file->getAttribute("mime.type", response.mime_type);
+ if (response.mime_type.empty()) {
+ logger_->log_warn("Using default mime type of application/octet-stream for
response body file: {}", response.uri);
+ response.mime_type = "application/octet-stream";
+ }
+
+ response.flow_file = flow_file;
+ return handler_->setResponseBody(response);
+}
+
/// @return Whether there was a request processed
bool ListenHTTP::processRequestBuffer(core::ProcessSession& session) {
gsl_Expects(handler_);
std::size_t flow_file_count = 0;
for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count)
{
- FlowFileBufferPair flow_file_buffer_pair;
- if (!handler_->dequeueRequest(flow_file_buffer_pair)) {
+ Handler::Request req;
+ if (!handler_->dequeueRequest(req)) {
break;
}
- auto flow_file = flow_file_buffer_pair.first;
- session.add(flow_file);
-
- if (flow_file_buffer_pair.second) {
- session.writeBuffer(flow_file,
flow_file_buffer_pair.second->getBuffer());
- }
-
- session.transfer(flow_file, Success);
+ [&] {
+ std::promise<void> req_done_promise;
+ auto res = req_done_promise.get_future();
+ req.set_value(Handler::RequestValue{std::ref(session),
std::move(req_done_promise)});
+ return res;
+ }().wait();
}
logger_->log_debug("ListenHTTP transferred {} flow files from HTTP request
buffer", flow_file_count);
return flow_file_count > 0;
}
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext
*context, std::string &&auth_dn_regex, std::optional<utils::Regex>
&&headers_as_attrs_regex)
+namespace {
+
+class MgConnectionInputStream : public io::InputStream {
+ public:
+ MgConnectionInputStream(struct mg_connection* conn, std::optional<size_t>
size): conn_(conn), netstream_size_limit_(size) {}
+
+ size_t read(std::span<std::byte> out_buffer) override {
+ const auto read_size_limit =
netstream_size_limit_.value_or(std::numeric_limits<size_t>::max()) -
netstream_offset_;
+ const auto limited_out_buf = out_buffer.subspan(0,
std::min(out_buffer.size(), read_size_limit));
+ const auto mg_read_return = mg_read(conn_, limited_out_buf.data(),
limited_out_buf.size());
+ if (mg_read_return <= 0) {
+ return 0;
+ }
+ netstream_offset_ += gsl::narrow<size_t>(mg_read_return);
+ return gsl::narrow<size_t>(mg_read_return);
+ }
+
+ private:
+ struct mg_connection* conn_;
+ size_t netstream_offset_{0}; // how much has been read from conn_
+ std::optional<size_t> netstream_size_limit_; // how much can we read from
conn_
+};
+
+class MgConnectionOutputStream : public io::OutputStream {
+ public:
+ explicit MgConnectionOutputStream(struct mg_connection* conn): conn_(conn) {}
+
+ size_t write(const uint8_t *value, size_t len) override {
+ const auto mg_write_return = mg_write(conn_, reinterpret_cast<const
void*>(value), len);
+ if (mg_write_return <= 0) {
+ return io::STREAM_ERROR;
+ }
+ return gsl::narrow<size_t>(mg_write_return);
+ }
+
+ private:
+ struct mg_connection* conn_;
+};
+
+} // namespace
+
+ListenHTTP::Handler::Handler(std::string base_uri, std::optional<std::string>
flow_id, uint64_t buffer_size, std::string &&auth_dn_regex,
std::optional<utils::Regex> &&headers_as_attrs_regex)
: base_uri_(std::move(base_uri)),
+ flow_id_(std::move(flow_id)),
auth_dn_regex_(std::move(auth_dn_regex)),
headers_as_attrs_regex_(std::move(headers_as_attrs_regex)),
- process_context_(context) {
- context->getProperty(BufferSize, buffer_size_);
+ buffer_size_(buffer_size) {
logger_->log_debug("ListenHTTP using {}: {}", BufferSize.name, buffer_size_);
}
@@ -269,25 +331,58 @@ void ListenHTTP::Handler::setHeaderAttributes(const
mg_request_info *req_info, c
}
}
-void ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const
mg_request_info *req_info, std::unique_ptr<io::BufferStream> content_buffer) {
- auto flow_file = std::make_shared<FlowFileRecord>();
- auto flow_version =
process_context_->getProcessorNode()->getFlowIdentifier();
- if (flow_version != nullptr) {
- flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID,
flow_version->getFlowId());
+void ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const
mg_request_info *req_info, bool write_body) {
+ if (buffer_size_ != 0 && request_buffer_.size() >= buffer_size_) {
+ logger_->log_warn("ListenHTTP buffer is full, '{}' request for '{}' uri
was dropped", req_info->request_method, req_info->request_uri);
+ sendHttp503(conn);
+ return;
+ } else {
+ logger_->log_warn("ListenHTTP buffer is NOT full {}/{}, '{}' request for
'{}' uri was dropped", request_buffer_.size() + 1, buffer_size_,
req_info->request_method, req_info->request_uri);
}
- setHeaderAttributes(req_info, *flow_file);
+ Request req;
+ auto req_triggered = req.get_future();
- if (buffer_size_ == 0 || request_buffer_.size() < buffer_size_) {
- request_buffer_.enqueue(std::make_pair(std::move(flow_file),
std::move(content_buffer)));
- } else {
- logger_->log_warn("ListenHTTP buffer is full, '{}' request for '{}' uri
was dropped", req_info->request_method, req_info->request_uri);
+ {
+ std::lock_guard lock(request_mtx_);
+ if (!running_) {
+ sendHttp503(conn);
+ return;
+ }
+
+ request_buffer_.enqueue(std::move(req));
+ }
+
+ auto req_result = req_triggered.get();
+ if (!req_result) {
sendHttp503(conn);
+ req_result.error().ret.set_value();
return;
}
+ auto& [session, req_done] = *req_result;
+
+ auto flow_file = session.get().create();
+ if (flow_id_) {
+ flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID,
flow_id_.value());
+ }
+
+ if (write_body) {
+ session.get().write(flow_file, [&] (auto& out) {
+ std::optional<size_t> request_size = std::nullopt;
+ if (req_info->content_length > 0) { request_size =
gsl::narrow<size_t>(req_info->content_length); }
+ MgConnectionInputStream mg_body{conn, request_size};
+ return minifi::internal::pipe(mg_body, *out);
+ });
+ }
+
+ setHeaderAttributes(req_info, *flow_file);
mg_printf(conn, "HTTP/1.1 200 OK\r\n");
- writeBody(conn, req_info);
+ writeBody(&session.get(), conn, req_info);
+
+ session.get().transfer(flow_file, Success);
+
+ req_done.set_value();
}
bool ListenHTTP::Handler::handlePost(CivetServer* /*server*/, struct
mg_connection *conn) {
@@ -305,7 +400,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer*
/*server*/, struct mg_connecti
// Always send 100 Continue, as allowed per standard to minimize client
delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
- enqueueRequest(conn, req_info, createContentBuffer(conn, req_info));
+ enqueueRequest(conn, req_info, true);
return true;
}
@@ -336,7 +431,7 @@ bool ListenHTTP::Handler::handleGet(CivetServer*
/*server*/, struct mg_connectio
return true;
}
- enqueueRequest(conn, req_info, nullptr);
+ enqueueRequest(conn, req_info, false);
return true;
}
@@ -353,7 +448,7 @@ bool ListenHTTP::Handler::handleHead(CivetServer*
/*server*/, struct mg_connecti
}
mg_printf(conn, "HTTP/1.1 200 OK\r\n");
- writeBody(conn, req_info, false /*include_payload*/);
+ writeBody(nullptr, conn, req_info);
return true;
}
@@ -372,26 +467,28 @@ bool ListenHTTP::Handler::handleDelete(CivetServer*
/*server*/, struct mg_connec
return true;
}
-void ListenHTTP::Handler::setResponseBody(const ResponseBody& response) {
+bool ListenHTTP::Handler::setResponseBody(const ResponseBody& response) {
std::lock_guard<std::mutex> guard(uri_map_mutex_);
- if (response.body.empty()) {
+ if (response.flow_file->getSize() == 0) {
logger_->log_info("Unregistering response body for URI '{}'",
response.uri);
response_uri_map_.erase(response.uri);
+ return false;
} else {
logger_->log_info("Registering response body for URI '{}' of length {}",
response.uri,
- response.body.size());
+ response.flow_file->getSize());
response_uri_map_[response.uri] = response;
+ return true;
}
}
-bool ListenHTTP::Handler::dequeueRequest(FlowFileBufferPair
&flow_file_buffer_pair) {
- return request_buffer_.tryDequeue(flow_file_buffer_pair);
+bool ListenHTTP::Handler::dequeueRequest(Request& req) {
+ return request_buffer_.tryDequeue(req);
}
-void ListenHTTP::Handler::writeBody(mg_connection *conn, const mg_request_info
*req_info, bool include_payload /*=true*/) {
+void ListenHTTP::Handler::writeBody(core::ProcessSession* payload_reader,
mg_connection *conn, const mg_request_info *req_info) {
const auto &request_uri_str = std::string(req_info->request_uri);
if (request_uri_str.size() > base_uri_.size() + 1) {
@@ -406,16 +503,19 @@ void ListenHTTP::Handler::writeBody(mg_connection *conn,
const mg_request_info *
}
}
- if (!response.body.empty()) {
- logger_->log_debug("Writing response body of {} bytes for URI: {}",
response.body.size(), req_info->request_uri);
+ if (response.flow_file && response.flow_file->getSize() != 0) {
+ logger_->log_debug("Writing response body of {} bytes for URI: {}",
response.flow_file->getSize(), req_info->request_uri);
mg_printf(conn, "Content-type: ");
mg_printf(conn, "%s", response.mime_type.c_str());
mg_printf(conn, "\r\n");
mg_printf(conn, "Content-length: ");
- mg_printf(conn, "%s", std::to_string(response.body.size()).c_str());
+ mg_printf(conn, "%s",
std::to_string(response.flow_file->getSize()).c_str());
mg_printf(conn, "\r\n\r\n");
- if (include_payload) {
- mg_write(conn, reinterpret_cast<char*>(response.body.data()),
response.body.size());
+ if (payload_reader) {
+ payload_reader->read(response.flow_file, [&] (auto& content) {
+ MgConnectionOutputStream out{conn};
+ return minifi::internal::pipe(*content, out);
+ });
}
} else {
logger_->log_debug("No response body available for URI: {}",
req_info->request_uri);
@@ -427,36 +527,6 @@ void ListenHTTP::Handler::writeBody(mg_connection *conn,
const mg_request_info *
}
}
-std::unique_ptr<io::BufferStream>
ListenHTTP::Handler::createContentBuffer(struct mg_connection *conn, const
struct mg_request_info *req_info) {
- auto content_buffer = std::make_unique<io::BufferStream>();
- size_t nlen = 0;
- int64_t tlen = req_info->content_length;
- std::array<uint8_t, 16384> buf{};
-
- // if we have no content length we should call mg_read until
- // there is no data left from the stream to be HTTP/1.1 compliant
- while (tlen == -1 || (tlen > 0 && nlen < gsl::narrow<size_t>(tlen))) {
- auto rlen = tlen == -1 ? buf.size() : gsl::narrow<size_t>(tlen) - nlen;
- if (rlen > buf.size()) {
- rlen = buf.size();
- }
-
- // Read a buffer of data from client
- const auto mg_read_return = mg_read(conn, buf.data(), rlen);
- if (mg_read_return <= 0) {
- break;
- }
- rlen = gsl::narrow<size_t>(mg_read_return);
-
- // Transfer buffer data to the output stream
- content_buffer->write(buf.data(), rlen);
-
- nlen += rlen;
- }
-
- return content_buffer;
-}
-
bool ListenHTTP::isSecure() const {
return (listeningPort.length() > 0) && *listeningPort.rbegin() == 's';
}
@@ -469,10 +539,22 @@ std::string ListenHTTP::getPort() const {
}
void ListenHTTP::notifyStop() {
+ if (handler_) {
+ handler_->stop();
+ }
+
server_.reset();
handler_.reset();
}
+std::set<core::Connectable*> ListenHTTP::getOutGoingConnections(const
std::string &relationship) {
+ auto result = core::Processor::getOutGoingConnections(relationship);
+ if (relationship == Self.name) {
+ result.insert(this);
+ }
+ return result;
+}
+
REGISTER_RESOURCE(ListenHTTP, Processor);
} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/civetweb/processors/ListenHTTP.h
b/extensions/civetweb/processors/ListenHTTP.h
index 4d149fac3..1567c387a 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -39,15 +39,21 @@
#include "utils/gsl.h"
#include "utils/Export.h"
#include "utils/RegexUtils.h"
+#include "core/FlowFileStore.h"
+
+namespace org::apache::nifi::minifi::test {
+struct ListenHTTPTestAccessor;
+} // namespace org::apache::nifi::minifi::test
namespace org::apache::nifi::minifi::processors {
class ListenHTTP : public core::Processor {
private:
- static constexpr std::string_view DEFAULT_BUFFER_SIZE_STR = "20000";
+ static constexpr std::string_view DEFAULT_BUFFER_SIZE_STR = "5";
+ static constexpr core::RelationshipDefinition Self{"__self__", "Marks the
FlowFile to be owned by this processor"};
public:
- using FlowFileBufferPair = std::pair<std::shared_ptr<FlowFileRecord>,
std::unique_ptr<io::BufferStream>>;
+ friend struct ::org::apache::nifi::minifi::test::ListenHTTPTestAccessor;
explicit ListenHTTP(std::string_view name, const utils::Identifier& uuid =
{})
: Processor(name, uuid) {
@@ -141,18 +147,39 @@ class ListenHTTP : public core::Processor {
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&
session_factory) override;
std::string getPort() const;
bool isSecure() const;
+ void restore(const std::shared_ptr<core::FlowFile>& flowFile) override;
+
+ bool isWorkAvailable() override {
+ return handler_ ? !handler_->empty() : false;
+ }
+
+ std::set<core::Connectable*> getOutGoingConnections(const std::string
&relationship) override;
struct ResponseBody {
std::string uri;
std::string mime_type;
- std::vector<std::byte> body;
+ std::shared_ptr<core::FlowFile> flow_file;
};
// HTTP request handler
class Handler : public CivetHandler {
public:
+ enum class FailureReason {
+ PROCESSOR_SHUTDOWN
+ };
+ struct RequestValue {
+ std::reference_wrapper<core::ProcessSession> session;
+ std::promise<void> ret;
+ };
+ struct FailureValue {
+ FailureReason reason;
+ std::promise<void> ret;
+ };
+ using Request = std::promise<nonstd::expected<RequestValue, FailureValue>>;
+
Handler(std::string base_uri,
- core::ProcessContext *context,
+ std::optional<std::string> flow_id,
+ uint64_t buffer_size,
std::string &&auth_dn_regex,
std::optional<utils::Regex> &&headers_as_attrs_regex);
bool handlePost(CivetServer *server, struct mg_connection *conn) override;
@@ -165,28 +192,49 @@ class ListenHTTP : public core::Processor {
* Sets a static response body string to be used for a given URI, with a
number of seconds it will be kept in memory.
* @param response
*/
- void setResponseBody(const ResponseBody& response);
+ bool setResponseBody(const ResponseBody& response);
+
+ bool dequeueRequest(Request& req);
- bool dequeueRequest(FlowFileBufferPair &flow_file_buffer_pair);
+ size_t requestCount() const {
+ return request_buffer_.size();
+ }
+
+ bool empty() const {
+ return request_buffer_.empty();
+ }
+
+ void stop() {
+ std::lock_guard lock(request_mtx_);
+ running_ = false;
+ Request req;
+ while (dequeueRequest(req)) {
+ std::promise<void> req_done_promise;
+ auto req_done = req_done_promise.get_future();
+
req.set_value(nonstd::make_unexpected(FailureValue{Handler::FailureReason::PROCESSOR_SHUTDOWN,
std::move(req_done_promise)}));
+ req_done.wait();
+ }
+ }
private:
static void sendHttp500(struct mg_connection *conn);
static void sendHttp503(struct mg_connection *conn);
bool authRequest(mg_connection *conn, const mg_request_info *req_info)
const;
void setHeaderAttributes(const mg_request_info *req_info, core::FlowFile&
flow_file) const;
- void writeBody(mg_connection *conn, const mg_request_info *req_info, bool
include_payload = true);
- static std::unique_ptr<io::BufferStream> createContentBuffer(struct
mg_connection *conn, const struct mg_request_info *req_info);
- void enqueueRequest(mg_connection *conn, const mg_request_info *req_info,
std::unique_ptr<io::BufferStream>);
+ void writeBody(core::ProcessSession* payload_reader, mg_connection *conn,
const mg_request_info *req_info);
+ void enqueueRequest(mg_connection *conn, const mg_request_info *req_info,
bool write_body);
std::string base_uri_;
+ std::optional<std::string> flow_id_;
utils::Regex auth_dn_regex_;
std::optional<utils::Regex> headers_as_attrs_regex_;
- core::ProcessContext *process_context_;
std::shared_ptr<core::logging::Logger> logger_ =
core::logging::LoggerFactory<ListenHTTP>::getLogger();
std::map<std::string, ResponseBody> response_uri_map_;
std::mutex uri_map_mutex_;
- uint64_t buffer_size_ = 0;
- utils::ConcurrentQueue<FlowFileBufferPair> request_buffer_;
+ uint64_t buffer_size_{0};
+ std::mutex request_mtx_;
+ bool running_{true};
+ utils::ConcurrentQueue<Request> request_buffer_;
};
static int logMessage(const struct mg_connection *conn, const char *message)
{
@@ -230,7 +278,11 @@ class ListenHTTP : public core::Processor {
private:
bool processIncomingFlowFile(core::ProcessSession &session);
+ bool processFlowFile(const std::shared_ptr<core::FlowFile>& flow_file);
bool processRequestBuffer(core::ProcessSession &session);
+ size_t pendingRequestCount() {
+ return handler_ ? handler_->requestCount() : 0;
+ }
std::shared_ptr<core::logging::Logger> logger_ =
core::logging::LoggerFactory<ListenHTTP>::getLogger(uuid_);
CivetCallbacks callbacks_;
@@ -238,6 +290,7 @@ class ListenHTTP : public core::Processor {
std::unique_ptr<Handler> handler_;
std::string listeningPort;
uint64_t batch_size_{0};
+ core::FlowFileStore file_store_;
};
} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/civetweb/tests/ListenHTTPTests.cpp
b/extensions/civetweb/tests/ListenHTTPTests.cpp
index 91beabff6..cc9de6f54 100644
--- a/extensions/civetweb/tests/ListenHTTPTests.cpp
+++ b/extensions/civetweb/tests/ListenHTTPTests.cpp
@@ -37,9 +37,14 @@
#include "SchedulingAgent.h"
#include "core/ProcessGroup.h"
#include "unit/SingleProcessorTestController.h"
+#include "unit/TestUtils.h"
namespace org::apache::nifi::minifi::test {
+struct ListenHTTPTestAccessor {
+ METHOD_ACCESSOR(pendingRequestCount)
+};
+
using namespace std::literals::chrono_literals;
using HttpRequestMethod = org::apache::nifi::minifi::http::HttpRequestMethod;
@@ -105,6 +110,7 @@ class ListenHTTPTestsFixture {
// Configure ListenHTTP processor
plan->setProperty(listen_http, minifi::processors::ListenHTTP::Port, "0");
+ listen_http->setMaxConcurrentTasks(10);
plan->setProperty(log_attribute,
minifi::processors::LogAttribute::FlowFilesToLog, "0");
}
@@ -149,41 +155,25 @@ class ListenHTTPTestsFixture {
url = protocol + "://localhost:" + portstr + "/contentListener/" +
endpoint;
}
- void initialize_client() {
- if (client != nullptr) {
- return;
- }
-
- client = std::make_unique<minifi::http::HTTPClient>();
- client->initialize(method, url, ssl_context_service);
- client->setVerbose(false);
- for (const auto &header : headers) {
- client->setRequestHeader(header.first, header.second);
- }
- if (method == HttpRequestMethod::POST) {
- client->setPostFields(payload);
- }
- }
-
- void check_content_type() {
+ void check_content_type(minifi::http::HTTPClient& client) {
if (endpoint == "test") {
std::string content_type;
if (!update_attribute->getDynamicProperty("mime.type", content_type)) {
content_type = "application/octet-stream";
}
- REQUIRE(content_type ==
minifi::utils::string::trim(client->getResponseHeaderMap().at("Content-type")));
- REQUIRE("19" ==
minifi::utils::string::trim(client->getResponseHeaderMap().at("Content-length")));
+ REQUIRE(content_type ==
minifi::utils::string::trim(client.getResponseHeaderMap().at("Content-type")));
+ REQUIRE("19" ==
minifi::utils::string::trim(client.getResponseHeaderMap().at("Content-length")));
} else {
- REQUIRE("0" ==
minifi::utils::string::trim(client->getResponseHeaderMap().at("Content-length")));
+ REQUIRE("0" ==
minifi::utils::string::trim(client.getResponseHeaderMap().at("Content-length")));
}
}
- void check_response_body() {
- if (method != HttpRequestMethod::GET && method != HttpRequestMethod::POST)
{
+ void check_response_body(minifi::http::HTTPClient& client) {
+ if (client.getMethod() != HttpRequestMethod::GET && client.getMethod() !=
HttpRequestMethod::POST) {
return;
}
- const auto &body_chars = client->getResponseBody();
+ const auto &body_chars = client.getResponseBody();
std::string response_body(body_chars.data(), body_chars.size());
if (endpoint == "test") {
REQUIRE("Hello response body" == response_body);
@@ -192,38 +182,77 @@ class ListenHTTPTestsFixture {
}
}
- void check_response(const bool success, const HttpResponseExpectations&
expect) {
+ void check_response(const bool success, const HttpResponseExpectations&
expect, minifi::http::HTTPClient& client) {
if (!expect.should_succeed) {
REQUIRE(!success);
- REQUIRE(expect.response_code == client->getResponseCode());
+ REQUIRE(expect.response_code == client.getResponseCode());
return;
}
REQUIRE(success);
- REQUIRE(expect.response_code == client->getResponseCode());
+ REQUIRE(expect.response_code == client.getResponseCode());
if (expect.response_code != 200) {
return;
}
- check_content_type();
- check_response_body();
+ check_content_type(client);
+ check_response_body(client);
}
- void test_connect(const std::vector<HttpResponseExpectations>&
response_expectaitons = {HttpResponseExpectations{}}, std::size_t
expected_commited_requests = 1) {
- initialize_client();
+ void test_connect(
+ const std::vector<HttpResponseExpectations>& response_expectations =
{HttpResponseExpectations{}},
+ std::size_t expected_commited_requests = 1,
+ std::unique_ptr<minifi::http::HTTPClient> client_to_use = {}) {
+ if (client_to_use) {
+ REQUIRE(response_expectations.size() == 1);
+ }
+ auto* proc =
dynamic_cast<minifi::processors::ListenHTTP*>(plan->getCurrentContext()->getProcessorNode()->getProcessor());
+ REQUIRE(proc);
+
+ std::vector<std::thread> client_threads;
- for (const auto& expect : response_expectaitons) {
- check_response(client->submit(), expect);
+ for (auto& expect : response_expectations) {
+ size_t prev_req_count =
ListenHTTPTestAccessor::call_pendingRequestCount(*proc);
+ auto thread_done_flag = std::make_shared<std::atomic_bool>(false);
+ client_threads.emplace_back([&, thread_done_flag] {
+ auto client = client_to_use ? std::move(client_to_use) :
initialize_client();
+ std::cout << "Submitting request" << std::endl;
+ check_response(client->submit(), expect, *client);
+ thread_done_flag->store(true);
+ });
+ while (!thread_done_flag->load() &&
ListenHTTPTestAccessor::call_pendingRequestCount(*proc) != prev_req_count + 1) {
+ std::this_thread::sleep_for(1ms);
+ }
}
plan->runCurrentProcessor(); // ListenHTTP
plan->runNextProcessor(); // LogAttribute
+ // shutdown processors so pending requests are correctly discarded
+ plan.reset();
+
+ for (auto& thread : client_threads) {
+ thread.join();
+ }
+
if (expected_commited_requests > 0 && (method == HttpRequestMethod::GET ||
method == HttpRequestMethod::POST)) {
REQUIRE(LogTestController::getInstance().contains("Size:" +
std::to_string(payload.size()) + " Offset:0"));
}
REQUIRE(LogTestController::getInstance().contains("Logged " +
std::to_string(expected_commited_requests) + " flow files"));
}
+ std::unique_ptr<minifi::http::HTTPClient> initialize_client() {
+ auto client = std::make_unique<minifi::http::HTTPClient>();
+ client->initialize(method, url, ssl_context_service);
+ client->setVerbose(false);
+ for (const auto &header : headers) {
+ client->setRequestHeader(header.first, header.second);
+ }
+ if (method == HttpRequestMethod::POST) {
+ client->setPostFields(payload);
+ }
+ return client;
+ }
+
protected:
std::filesystem::path tmp_dir;
TestController testController;
@@ -239,7 +268,6 @@ class ListenHTTPTestsFixture {
std::string payload;
std::string endpoint = "test";
std::string url;
- std::unique_ptr<minifi::http::HTTPClient> client;
std::size_t batch_size_ = 0;
std::size_t buffer_size_ = 0;
};
@@ -398,7 +426,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP Batch
tests", "[batch]") {
SECTION("Batch size smaller than request count") {
batch_size_ = 4;
- create_requests(5, 0);
+ create_requests(4, 1);
expected_processed_request_count = 4;
SECTION("GET") {
@@ -659,7 +687,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS minimum SSL
version", "[https]")
run_server();
- client = std::make_unique<minifi::http::HTTPClient>();
+ auto client = std::make_unique<minifi::http::HTTPClient>();
client->setVerbose(false);
client->initialize(method, url, ssl_context_service);
if (method == HttpRequestMethod::POST) {
@@ -667,7 +695,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS minimum SSL
version", "[https]")
}
REQUIRE(client->setSpecificSSLVersion(minifi::http::SSLVersion::TLSv1_1));
- test_connect({HttpResponseExpectations{false, 0}}, 0);
+ test_connect({HttpResponseExpectations{false, 0}}, 0, std::move(client));
}
TEST_CASE("ListenHTTP bored yield", "[listenhttp][bored][yield]") {
diff --git
a/extensions/standard-processors/tests/integration/InvokeHTTPTests.cpp
b/extensions/standard-processors/tests/integration/InvokeHTTPTests.cpp
index 5b87e3b3a..b0872b8a1 100644
--- a/extensions/standard-processors/tests/integration/InvokeHTTPTests.cpp
+++ b/extensions/standard-processors/tests/integration/InvokeHTTPTests.cpp
@@ -33,230 +33,62 @@
#include "processors/LogAttribute.h"
#include "unit/SingleProcessorTestController.h"
#include "integration/ConnectionCountingServer.h"
+#include "unit/TestUtils.h"
namespace org::apache::nifi::minifi::test {
class TestHTTPServer {
public:
- TestHTTPServer();
+ explicit TestHTTPServer(TestController& test_controller) {
+
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
+
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::LogAttribute>();
+
+ test_plan_ = test_controller.createPlan();
+
+ listen_http_ =
dynamic_cast<processors::ListenHTTP*>(test_plan_->addProcessor("ListenHTTP",
PROCESSOR_NAME));
+ log_attribute_ =
dynamic_cast<processors::LogAttribute*>(test_plan_->addProcessor("LogAttribute",
"LogAttribute", core::Relationship("success", "description"), true));
+ REQUIRE(listen_http_);
+ REQUIRE(log_attribute_);
+ test_plan_->setProperty(listen_http_,
org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "testytesttest");
+ test_plan_->setProperty(listen_http_,
org::apache::nifi::minifi::processors::ListenHTTP::Port, "8681");
+ test_plan_->setProperty(listen_http_,
org::apache::nifi::minifi::processors::ListenHTTP::HeadersAsAttributesRegex,
".*");
+ test_plan_->runProcessor(listen_http_);
+ test_plan_->runProcessor(log_attribute_);
+ thread_ = std::thread{[this] {
+ while (running_) {
+ if (listen_http_->isWorkAvailable()) {
+ test_plan_->runProcessor(listen_http_);
+ test_plan_->runProcessor(log_attribute_);
+ }
+ }
+ }};
+ }
+ TestHTTPServer(const TestHTTPServer&) = delete;
+ TestHTTPServer(TestHTTPServer&&) = delete;
+ TestHTTPServer& operator=(const TestHTTPServer&) = delete;
+ TestHTTPServer& operator=(TestHTTPServer&&) = delete;
+
static constexpr const char* PROCESSOR_NAME = "my_http_server";
static constexpr const char* URL = "http://localhost:8681/testytesttest";
- void trigger() {
-
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
-
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::LogAttribute>();
- test_plan_->reset();
- test_controller_.runSession(test_plan_);
+ ~TestHTTPServer() {
+ running_ = false;
+ thread_.join();
}
private:
- TestController test_controller_;
processors::ListenHTTP* listen_http_ = nullptr;
processors::LogAttribute* log_attribute_ = nullptr;
- std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan();
+ std::shared_ptr<TestPlan> test_plan_;
+ std::thread thread_;
+ std::atomic_bool running_{true};
};
-TestHTTPServer::TestHTTPServer() {
-
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::ListenHTTP>();
-
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::LogAttribute>();
-
- listen_http_ =
dynamic_cast<processors::ListenHTTP*>(test_plan_->addProcessor("ListenHTTP",
PROCESSOR_NAME));
- log_attribute_ =
dynamic_cast<processors::LogAttribute*>(test_plan_->addProcessor("LogAttribute",
"LogAttribute", core::Relationship("success", "description"), true));
- REQUIRE(listen_http_);
- REQUIRE(log_attribute_);
- test_plan_->setProperty(listen_http_,
org::apache::nifi::minifi::processors::ListenHTTP::BasePath, "testytesttest");
- test_plan_->setProperty(listen_http_,
org::apache::nifi::minifi::processors::ListenHTTP::Port, "8681");
- test_plan_->setProperty(listen_http_,
org::apache::nifi::minifi::processors::ListenHTTP::HeadersAsAttributesRegex,
".*");
- test_controller_.runSession(test_plan_);
-}
-
-TEST_CASE("HTTPTestsWithNoResourceClaimPOST", "[httptest1]") {
- TestController testController;
- TestHTTPServer http_server;
-
-
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::InvokeHTTP>();
-
- std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
- std::shared_ptr<TestRepository> repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> invokehttp =
std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
- invokehttp->initialize();
-
- minifi::utils::Identifier invokehttp_uuid = invokehttp->getUUID();
- REQUIRE(invokehttp_uuid);
-
- auto node = std::make_shared<core::ProcessorNode>(invokehttp.get());
- auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
-
-
context->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method,
"POST");
- context->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL,
TestHTTPServer::URL);
-
- auto session = std::make_shared<core::ProcessSession>(context);
-
- invokehttp->incrementActiveTasks();
- invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- auto factory2 = std::make_shared<core::ProcessSessionFactory>(context);
- invokehttp->onSchedule(*context, *factory2);
- invokehttp->onTrigger(*context, *session);
-
- auto reporter = session->getProvenanceReporter();
- auto records = reporter->getEvents();
- auto record = session->get();
- REQUIRE(record == nullptr);
- REQUIRE(records.empty());
-
- reporter = session->getProvenanceReporter();
-
- records = reporter->getEvents();
- session->commit();
-
- invokehttp->incrementActiveTasks();
- invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- invokehttp->onTrigger(*context, *session);
-
- session->commit();
- records = reporter->getEvents();
- // FIXME(fgerlits): this test is very weak, as `records` is empty
- for (const auto& provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() ==
TestHTTPServer::PROCESSOR_NAME);
- }
-
- REQUIRE(LogTestController::getInstance().contains("Exiting because method is
POST"));
-}
-
-TEST_CASE("HTTPTestsWithResourceClaimPOST", "[httptest1]") {
- TestController testController;
-
-
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::InvokeHTTP>();
-
- auto repo = std::make_shared<TestRepository>();
-
- std::shared_ptr<core::Processor> listenhttp =
std::make_shared<org::apache::nifi::minifi::processors::ListenHTTP>("listenhttp");
- listenhttp->initialize();
-
- std::shared_ptr<core::Processor> invokehttp =
std::make_shared<org::apache::nifi::minifi::processors::InvokeHTTP>("invokehttp");
- invokehttp->initialize();
-
- minifi::utils::Identifier processoruuid = listenhttp->getUUID();
- REQUIRE(processoruuid);
-
- minifi::utils::Identifier invokehttp_uuid = invokehttp->getUUID();
- REQUIRE(invokehttp_uuid);
-
- auto configuration = std::make_shared<minifi::Configure>();
- std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
- content_repo->initialize(configuration);
-
- auto connection = std::make_shared<minifi::Connection>(repo, content_repo,
"getfileCreate2Connection");
- connection->addRelationship(core::Relationship("success", "description"));
-
- auto connection2 = std::make_shared<minifi::Connection>(repo, content_repo,
"listenhttp");
-
- connection2->addRelationship(core::Relationship("No Retry", "description"));
-
- // link the connections so that we can test results at the end for this
- connection->setSource(listenhttp.get());
- connection->setSourceUUID(invokehttp_uuid);
- connection->setDestinationUUID(processoruuid);
- connection2->setSourceUUID(processoruuid);
-
- listenhttp->addConnection(connection.get());
- invokehttp->addConnection(connection.get());
- invokehttp->addConnection(connection2.get());
-
- auto node = std::make_shared<core::ProcessorNode>(listenhttp.get());
- auto node2 = std::make_shared<core::ProcessorNode>(invokehttp.get());
- auto context = std::make_shared<core::ProcessContext>(node, nullptr, repo,
repo, content_repo);
- auto context2 = std::make_shared<core::ProcessContext>(node2, nullptr, repo,
repo, content_repo);
-
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::Port,
"8680");
-
context->setProperty(org::apache::nifi::minifi::processors::ListenHTTP::BasePath,
"/testytesttest");
-
-
context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::Method,
"POST");
-
context2->setProperty(org::apache::nifi::minifi::processors::InvokeHTTP::URL,
"http://localhost:8680/testytesttest");
- auto session = std::make_shared<core::ProcessSession>(context);
- auto session2 = std::make_shared<core::ProcessSession>(context2);
-
- REQUIRE(listenhttp->getName() == "listenhttp");
-
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-
- std::shared_ptr<core::FlowFile> record;
-
- invokehttp->incrementActiveTasks();
- invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- auto factory2 = std::make_shared<core::ProcessSessionFactory>(context2);
- invokehttp->onSchedule(*context2, *factory2);
- invokehttp->onTrigger(*context2, *session2);
-
- listenhttp->incrementActiveTasks();
- listenhttp->setScheduledState(core::ScheduledState::RUNNING);
- listenhttp->onSchedule(*context, *factory);
- listenhttp->onTrigger(*context, *session);
-
- auto reporter = session->getProvenanceReporter();
- auto records = reporter->getEvents();
- record = session->get();
- REQUIRE(record == nullptr);
- REQUIRE(records.empty());
-
- listenhttp->incrementActiveTasks();
- listenhttp->setScheduledState(core::ScheduledState::RUNNING);
- listenhttp->onTrigger(*context, *session);
-
- reporter = session->getProvenanceReporter();
-
- records = reporter->getEvents();
- session->commit();
-
- invokehttp->incrementActiveTasks();
- invokehttp->setScheduledState(core::ScheduledState::RUNNING);
- invokehttp->onTrigger(*context2, *session2);
-
- session2->commit();
- records = reporter->getEvents();
- // FIXME(fgerlits): this test is very weak, as `records` is empty
- for (const auto& provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() == listenhttp->getName());
- }
-
- REQUIRE(true == LogTestController::getInstance().contains("Exiting because
method is POST"));
-}
-
-TEST_CASE("HTTPTestsPostNoResourceClaim", "[httptest1]") {
- TestController testController;
- TestHTTPServer http_server;
-
-
LogTestController::getInstance().setDebug<org::apache::nifi::minifi::processors::InvokeHTTP>();
-
- std::shared_ptr<TestPlan> plan = testController.createPlan();
- auto invokehttp = plan->addProcessor("InvokeHTTP", "invokehttp");
-
- plan->setProperty(invokehttp,
org::apache::nifi::minifi::processors::InvokeHTTP::Method, "POST");
- plan->setProperty(invokehttp,
org::apache::nifi::minifi::processors::InvokeHTTP::URL, TestHTTPServer::URL);
- testController.runSession(plan);
-
- auto records = plan->getProvenanceRecords();
- std::shared_ptr<core::FlowFile> record = plan->getCurrentFlowFile();
- REQUIRE(record == nullptr);
- REQUIRE(records.empty());
-
- plan->reset();
- testController.runSession(plan);
-
- records = plan->getProvenanceRecords();
- // FIXME(fgerlits): this test is very weak, as `records` is empty
- for (const auto& provEventRecord : records) {
- REQUIRE(provEventRecord->getComponentType() ==
TestHTTPServer::PROCESSOR_NAME);
- }
-
- REQUIRE(true == LogTestController::getInstance().contains("Exiting because
method is POST"));
-}
-
TEST_CASE("HTTPTestsPenalizeNoRetry", "[httptest1]") {
using minifi::processors::InvokeHTTP;
TestController testController;
- TestHTTPServer http_server;
+ TestHTTPServer http_server(testController);
LogTestController::getInstance().setInfo<minifi::core::ProcessSession>();
@@ -285,11 +117,12 @@ TEST_CASE("HTTPTestsPenalizeNoRetry", "[httptest1]") {
TEST_CASE("InvokeHTTP fails with when flow contains invalid attribute names in
HTTP headers", "[httptest1]") {
using minifi::processors::InvokeHTTP;
- TestHTTPServer http_server;
- LogTestController::getInstance().setDebug<InvokeHTTP>();
test::SingleProcessorTestController
test_controller{std::make_unique<InvokeHTTP>("InvokeHTTP")};
auto invokehttp = test_controller.getProcessor();
+ TestHTTPServer http_server(test_controller);
+
+ LogTestController::getInstance().setDebug<InvokeHTTP>();
invokehttp->setProperty(InvokeHTTP::Method, "GET");
invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
@@ -305,11 +138,12 @@ TEST_CASE("InvokeHTTP fails with when flow contains
invalid attribute names in H
TEST_CASE("InvokeHTTP succeeds when the flow file contains an attribute that
would be invalid as an HTTP header, and the policy is FAIL, but the attribute
is not matched",
"[httptest1][invokehttp][httpheader][attribute]") {
using minifi::processors::InvokeHTTP;
- TestHTTPServer http_server;
- LogTestController::getInstance().setDebug<InvokeHTTP>();
test::SingleProcessorTestController
test_controller{std::make_unique<InvokeHTTP>("InvokeHTTP")};
auto invokehttp = test_controller.getProcessor();
+ TestHTTPServer http_server(test_controller);
+
+ LogTestController::getInstance().setDebug<InvokeHTTP>();
invokehttp->setProperty(InvokeHTTP::Method, "GET");
invokehttp->setProperty(InvokeHTTP::URL, TestHTTPServer::URL);
@@ -320,17 +154,17 @@ TEST_CASE("InvokeHTTP succeeds when the flow file
contains an attribute that wou
REQUIRE(result.at(InvokeHTTP::RelFailure).empty());
const auto& success_contents = result.at(InvokeHTTP::Success);
REQUIRE(success_contents.size() == 1);
- http_server.trigger();
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(1s, "key:valid-header
value:value2"));
REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid"));
- REQUIRE(LogTestController::getInstance().contains("key:valid-header
value:value2"));
}
TEST_CASE("InvokeHTTP replaces invalid characters of attributes",
"[httptest1]") {
using minifi::processors::InvokeHTTP;
- TestHTTPServer http_server;
test::SingleProcessorTestController
test_controller{std::make_unique<InvokeHTTP>("InvokeHTTP")};
auto invokehttp = test_controller.getProcessor();
+ TestHTTPServer http_server(test_controller);
+
LogTestController::getInstance().setTrace<InvokeHTTP>();
invokehttp->setProperty(InvokeHTTP::Method, "GET");
@@ -341,17 +175,17 @@ TEST_CASE("InvokeHTTP replaces invalid characters of
attributes", "[httptest1]")
auto file_contents = result.at(InvokeHTTP::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
- http_server.trigger();
- REQUIRE(LogTestController::getInstance().contains("key:invalid-header
value:value"));
-
REQUIRE(LogTestController::getInstance().contains("key:X-MiNiFi-Empty-Attribute-Name
value:value2"));
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(1s, "key:invalid-header
value:value"));
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(1s,
"key:X-MiNiFi-Empty-Attribute-Name value:value2"));
}
TEST_CASE("InvokeHTTP drops invalid attributes from HTTP headers",
"[httptest1]") {
using minifi::processors::InvokeHTTP;
- TestHTTPServer http_server;
test::SingleProcessorTestController
test_controller{std::make_unique<InvokeHTTP>("InvokeHTTP")};
auto invokehttp = test_controller.getProcessor();
+ TestHTTPServer http_server(test_controller);
+
LogTestController::getInstance().setTrace<InvokeHTTP>();
invokehttp->setProperty(InvokeHTTP::Method, "GET");
@@ -363,17 +197,17 @@ TEST_CASE("InvokeHTTP drops invalid attributes from HTTP
headers", "[httptest1]"
auto file_contents = result.at(InvokeHTTP::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
- http_server.trigger();
- REQUIRE(LogTestController::getInstance().contains("key:legit-header
value:value1"));
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(1s, "key:legit-header
value:value1"));
REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s));
}
TEST_CASE("InvokeHTTP empty Attributes to Send means no attributes are sent",
"[httptest1]") {
using minifi::processors::InvokeHTTP;
- TestHTTPServer http_server;
test::SingleProcessorTestController
test_controller{std::make_unique<InvokeHTTP>("InvokeHTTP")};
auto invokehttp = test_controller.getProcessor();
+ TestHTTPServer http_server(test_controller);
+
LogTestController::getInstance().setTrace<InvokeHTTP>();
invokehttp->setProperty(InvokeHTTP::Method, "GET");
@@ -385,17 +219,17 @@ TEST_CASE("InvokeHTTP empty Attributes to Send means no
attributes are sent", "[
auto file_contents = result.at(InvokeHTTP::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
- http_server.trigger();
REQUIRE_FALSE(LogTestController::getInstance().contains("key:legit-header
value:value1"));
REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s));
}
TEST_CASE("InvokeHTTP DateHeader", "[InvokeHTTP]") {
using minifi::processors::InvokeHTTP;
- TestHTTPServer http_server;
test::SingleProcessorTestController
test_controller{std::make_unique<InvokeHTTP>("InvokeHTTP")};
auto invoke_http = test_controller.getProcessor();
+ TestHTTPServer http_server(test_controller);
+
LogTestController::getInstance().setTrace<InvokeHTTP>();
invoke_http->setProperty(InvokeHTTP::Method, "GET");
@@ -416,16 +250,16 @@ TEST_CASE("InvokeHTTP DateHeader", "[InvokeHTTP]") {
auto file_contents = result.at(InvokeHTTP::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
- http_server.trigger();
- REQUIRE(LogTestController::getInstance().contains("key:Date", 0ms) ==
date_header);
+ REQUIRE(utils::verifyEventHappenedInPollTime(1s, [&] {return
LogTestController::getInstance().contains("key:Date", 0ms) == date_header;}));
}
TEST_CASE("InvokeHTTP Attributes to Send uses full string matching, not
substring", "[httptest1]") {
using minifi::processors::InvokeHTTP;
- TestHTTPServer http_server;
test::SingleProcessorTestController
test_controller{std::make_unique<InvokeHTTP>("InvokeHTTP")};
auto invokehttp = test_controller.getProcessor();
+ TestHTTPServer http_server(test_controller);
+
LogTestController::getInstance().setTrace<InvokeHTTP>();
invokehttp->setProperty(InvokeHTTP::Method, "GET");
@@ -437,9 +271,8 @@ TEST_CASE("InvokeHTTP Attributes to Send uses full string
matching, not substrin
auto file_contents = result.at(InvokeHTTP::Success);
REQUIRE(file_contents.size() == 1);
REQUIRE(test_controller.plan->getContent(file_contents[0]) == "data");
- http_server.trigger();
+ REQUIRE(utils::verifyLogLinePresenceInPollTime(1s, "key:header
value:value2"));
REQUIRE_FALSE(LogTestController::getInstance().contains("key:header1
value:value1"));
- REQUIRE(LogTestController::getInstance().contains("key:header
value:value2"));
REQUIRE_FALSE(LogTestController::getInstance().contains("key:invalid", 0s));
}
diff --git a/libminifi/include/http/HTTPClient.h
b/libminifi/include/http/HTTPClient.h
index d34e499fe..28ee9f26f 100644
--- a/libminifi/include/http/HTTPClient.h
+++ b/libminifi/include/http/HTTPClient.h
@@ -143,6 +143,10 @@ class HTTPClient : public BaseHTTPClient, public
core::Connectable {
return response_data_.header_response.getHeaderMap();
}
+ std::optional<http::HttpRequestMethod> getMethod() const {
+ return method_;
+ }
+
void setInterface(const std::string &);
void setFollowRedirects(bool follow);
diff --git a/libminifi/include/utils/MinifiConcurrentQueue.h
b/libminifi/include/utils/MinifiConcurrentQueue.h
index 1db58080b..34c9bca27 100644
--- a/libminifi/include/utils/MinifiConcurrentQueue.h
+++ b/libminifi/include/utils/MinifiConcurrentQueue.h
@@ -57,6 +57,8 @@ class ConcurrentQueue {
return *this;
}
+ virtual ~ConcurrentQueue() = default;
+
bool tryDequeue(T& out) {
std::unique_lock<std::mutex> lck(mtx_);
return tryDequeueImpl(lck, out);