Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128787106
--- Diff: libminifi/include/utils/ThreadPool.h ---
@@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
template<typename T>
void ThreadPool<T>::run_tasks() {
auto waitperiod = std::chrono::milliseconds(1) * 100;
+ uint64_t wait_decay_ = 0;
while (running_.load()) {
+ // if we are spinning, perform a wait. If something changes in the
worker such that the timeslice has changed, we will pick that information up.
Note that it's possible
+ // we could starve for processing time if all workers are waiting. In
the event that the number of workers far exceeds the number of threads, threads
will spin and potentially
+ // wait until they arrive at a task that can be run. In this case we
reset the wait_decay and attempt to pick up a new task. This means that threads
that recently ran should
+ // be more likely to run. This is intentional.
+ if (wait_decay_ > 1000) {
+ std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
+ }
Worker<T> task;
if (!worker_queue_.try_dequeue(task)) {
+
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
tasks_available_.wait_for(lock, waitperiod);
continue;
}
- task.run();
+ else {
+
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+
+ bool wait_to_run = false;
+ if (task.getTimeSlice() > 1) {
+ auto now = std::chrono::system_clock::now().time_since_epoch();
+ auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
+ if (task.getTimeSlice() > ms.count()) {
+ wait_to_run = true;
+ }
+ }
+ // if we have to wait we re-queue the worker.
+ if (wait_to_run) {
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+ worker_queue_.enqueue(std::move(task));
--- End diff --
Unfortunately that would require a locking queue or dequeuing everything in
order to sort. Since it is a lock free queue tasks can be enqueued and dequeued
with only a std::move with relatively low cost. An alternative to this would be
to make multiple queues, but then that would require a prioritized strategy to
access each queue. I think that would be a follow on activity if the need
arose.
The biggest negative thus far is that shutdown takes longer because we are
now deterministically awaiting threads to end.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---