adamdebreceni commented on code in PR #1038:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1038#discussion_r930797065
##########
libminifi/src/utils/FlowFileQueue.cpp:
##########
@@ -16,59 +16,232 @@
*/
#include "utils/FlowFileQueue.h"
+#include "core/logging/LoggerConfiguration.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
-bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const
value_type& left, const value_type& right) {
- // this is operator< implemented using > so that top() is the element with
the smallest key (earliest expiration)
- // rather than the element with the largest key, which is the default for
std::priority_queue
- return left->getPenaltyExpiration() > right->getPenaltyExpiration();
+bool FlowFileQueue::FlowFilePenaltyExpirationComparator::operator()(const
value_type& left, const value_type& right) const {
+ // a flow file with earlier expiration compares less
+ return left->getPenaltyExpiration() < right->getPenaltyExpiration();
}
+bool FlowFileQueue::SwappedFlowFileComparator::operator()(const
SwappedFlowFile& left, const SwappedFlowFile& right) const {
+ // a swapped flow file with earlier expiration compares less
+ return left.to_be_processed_after < right.to_be_processed_after;
+}
+
+FlowFileQueue::FlowFileQueue(std::shared_ptr<SwapManager> swap_manager)
+ : swap_manager_(std::move(swap_manager)),
+ logger_(core::logging::LoggerFactory<FlowFileQueue>::getLogger()) {}
+
FlowFileQueue::value_type FlowFileQueue::pop() {
- if (empty()) {
- throw std::logic_error{"pop() called on an empty FlowFileQueue"};
- }
+ return tryPopImpl({}).value();
+}
- value_type next_flow_file = queue_.top();
- queue_.pop();
- return next_flow_file;
+std::optional<FlowFileQueue::value_type> FlowFileQueue::tryPop() {
+ return tryPopImpl(std::chrono::milliseconds{0});
}
-void FlowFileQueue::push(const value_type& element) {
- if (!element->isPenalized()) {
- element->penalize(std::chrono::milliseconds{0});
+std::optional<FlowFileQueue::value_type>
FlowFileQueue::tryPop(std::chrono::milliseconds timeout) {
+ return tryPopImpl(timeout);
+}
+
+std::optional<FlowFileQueue::value_type>
FlowFileQueue::tryPopImpl(std::optional<std::chrono::milliseconds> timeout) {
+ std::optional<std::shared_ptr<core::FlowFile>> result;
+ if (!queue_.empty()) {
+ result = queue_.popMin();
+ if (processLoadTaskWait(std::chrono::milliseconds{0})) {
+ initiateLoadIfNeeded();
+ }
+ return result;
+ }
+ if (load_task_) {
+ logger_->log_debug("Head is empty checking already running load task");
+ if (!processLoadTaskWait(timeout)) {
+ return std::nullopt;
+ }
+ if (!queue_.empty()) {
+ // load provided items
+ result = queue_.popMin();
+ initiateLoadIfNeeded();
+ return result;
+ }
}
+ // no pending load_task_ and no items in the queue_
+ initiateLoadIfNeeded();
+ return std::nullopt;
+}
- queue_.push(element);
+bool
FlowFileQueue::processLoadTaskWait(std::optional<std::chrono::milliseconds>
timeout) {
+ if (!load_task_) {
+ return true;
+ }
+ std::future_status status = std::future_status::ready;
+ if (timeout) {
+ status = load_task_.value().items.wait_for(timeout.value());
+ }
+ if (status == std::future_status::timeout) {
+ logger_->log_debug("Load task is not yet completed");
+ return false;
+ }
+ if (status != std::future_status::ready) {
+ throw std::logic_error("Unknown future status deferred future?");
+ }
Review Comment:
added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]