adamdebreceni commented on code in PR #1038:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1038#discussion_r930800753
##########
libminifi/include/utils/FlowFileQueue.h:
##########
@@ -29,22 +35,78 @@ namespace minifi {
namespace utils {
class FlowFileQueue {
+ friend struct ::FlowFileQueueTestAccessor;
+ using TimePoint = std::chrono::steady_clock::time_point;
+
public:
using value_type = std::shared_ptr<core::FlowFile>;
+ explicit FlowFileQueue(std::shared_ptr<SwapManager> swap_manager = {});
+
value_type pop();
- void push(const value_type& element);
- void push(value_type&& element);
+ std::optional<value_type> tryPop();
+ std::optional<value_type> tryPop(std::chrono::milliseconds timeout);
+ void push(value_type element);
bool isWorkAvailable() const;
bool empty() const;
size_t size() const;
+ void setMinSize(size_t min_size);
+ void setTargetSize(size_t target_size);
+ void setMaxSize(size_t max_size);
+ void clear();
private:
+ std::optional<value_type>
tryPopImpl(std::optional<std::chrono::milliseconds> timeout);
+
+ void initiateLoadIfNeeded();
+
+ struct LoadTask {
+ TimePoint min;
+ TimePoint max;
+ std::future<std::vector<std::shared_ptr<core::FlowFile>>> items;
+ size_t count;
+ // flow files that have been pushed into the queue while a
+ // load was pending
+ std::vector<value_type> intermediate_items;
+
+ LoadTask(TimePoint min, TimePoint max,
std::future<std::vector<std::shared_ptr<core::FlowFile>>> items, size_t count)
+ : min(min), max(max), items(std::move(items)), count(count) {}
+
+ size_t size() const {
+ return count + intermediate_items.size();
+ }
+ };
+
+ bool processLoadTaskWait(std::optional<std::chrono::milliseconds> timeout);
+
struct FlowFilePenaltyExpirationComparator {
- bool operator()(const value_type& left, const value_type& right);
+ bool operator()(const value_type& left, const value_type& right) const;
+ };
+
+ struct SwappedFlowFileComparator {
+ bool operator()(const SwappedFlowFile& left, const SwappedFlowFile& right)
const;
};
- std::priority_queue<value_type, std::vector<value_type>,
FlowFilePenaltyExpirationComparator> queue_;
+ size_t shouldSwapOutCount() const;
+
+ size_t shouldSwapInCount() const;
+
+ std::shared_ptr<SwapManager> swap_manager_;
Review Comment:
I thought we got rid of it but `Connection` takes and stores both flowfile
repo and content repo by `shared_ptr`, it would be inconsistent taking this as
non-owning ref, this ownership should be changed/eliminated together IMO
##########
libminifi/include/utils/FlowFileQueue.h:
##########
@@ -29,22 +35,78 @@ namespace minifi {
namespace utils {
class FlowFileQueue {
+ friend struct ::FlowFileQueueTestAccessor;
+ using TimePoint = std::chrono::steady_clock::time_point;
+
public:
using value_type = std::shared_ptr<core::FlowFile>;
+ explicit FlowFileQueue(std::shared_ptr<SwapManager> swap_manager = {});
+
value_type pop();
- void push(const value_type& element);
- void push(value_type&& element);
+ std::optional<value_type> tryPop();
+ std::optional<value_type> tryPop(std::chrono::milliseconds timeout);
+ void push(value_type element);
bool isWorkAvailable() const;
bool empty() const;
size_t size() const;
+ void setMinSize(size_t min_size);
+ void setTargetSize(size_t target_size);
+ void setMaxSize(size_t max_size);
+ void clear();
private:
+ std::optional<value_type>
tryPopImpl(std::optional<std::chrono::milliseconds> timeout);
+
+ void initiateLoadIfNeeded();
+
+ struct LoadTask {
+ TimePoint min;
+ TimePoint max;
+ std::future<std::vector<std::shared_ptr<core::FlowFile>>> items;
+ size_t count;
+ // flow files that have been pushed into the queue while a
+ // load was pending
+ std::vector<value_type> intermediate_items;
+
+ LoadTask(TimePoint min, TimePoint max,
std::future<std::vector<std::shared_ptr<core::FlowFile>>> items, size_t count)
+ : min(min), max(max), items(std::move(items)), count(count) {}
+
+ size_t size() const {
+ return count + intermediate_items.size();
+ }
+ };
+
+ bool processLoadTaskWait(std::optional<std::chrono::milliseconds> timeout);
+
struct FlowFilePenaltyExpirationComparator {
- bool operator()(const value_type& left, const value_type& right);
+ bool operator()(const value_type& left, const value_type& right) const;
+ };
+
+ struct SwappedFlowFileComparator {
+ bool operator()(const SwappedFlowFile& left, const SwappedFlowFile& right)
const;
};
- std::priority_queue<value_type, std::vector<value_type>,
FlowFilePenaltyExpirationComparator> queue_;
+ size_t shouldSwapOutCount() const;
+
+ size_t shouldSwapInCount() const;
+
+ std::shared_ptr<SwapManager> swap_manager_;
+ // a load is initiated if the queue_ shrinks below this threshold
+ std::atomic<size_t> min_size_{0};
+ // a given operation (load/store) will try to approach this size
+ std::atomic<size_t> target_size_{0};
+ // a store is initiated if the queue_ grows beyond this threshold
+ std::atomic<size_t> max_size_{0};
+
+ MinMaxHeap<SwappedFlowFile, SwappedFlowFileComparator> swapped_flow_files_;
+ // the pending swap-in operation (if any)
+ std::optional<LoadTask> load_task_;
+ MinMaxHeap<value_type, FlowFilePenaltyExpirationComparator> queue_;
+
+ std::shared_ptr<timeutils::SteadyClock>
clock_{std::make_shared<timeutils::SteadyClock>()};
Review Comment:
changed it to `unique_ptr`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]