Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128656760
--- Diff: libminifi/include/utils/ThreadPool.h ---
@@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
template<typename T>
void ThreadPool<T>::run_tasks() {
auto waitperiod = std::chrono::milliseconds(1) * 100;
+ uint64_t wait_decay_ = 0;
while (running_.load()) {
+ // if we are spinning, perform a wait. If something changes in the
worker such that the timeslice has changed, we will pick that information up.
Note that it's possible
+ // we could starve for processing time if all workers are waiting. In
the event that the number of workers far exceeds the number of threads, threads
will spin and potentially
+ // wait until they arrive at a task that can be run. In this case we
reset the wait_decay and attempt to pick up a new task. This means that threads
that recently ran should
+ // be more likely to run. This is intentional.
+ if (wait_decay_ > 1000) {
+ std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
+ }
Worker<T> task;
if (!worker_queue_.try_dequeue(task)) {
+
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
tasks_available_.wait_for(lock, waitperiod);
continue;
}
- task.run();
+ else {
+
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+
+ bool wait_to_run = false;
+ if (task.getTimeSlice() > 1) {
+ auto now = std::chrono::system_clock::now().time_since_epoch();
+ auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
+ if (task.getTimeSlice() > ms.count()) {
+ wait_to_run = true;
+ }
+ }
+ // if we have to wait we re-queue the worker.
+ if (wait_to_run) {
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+ worker_queue_.enqueue(std::move(task));
--- End diff --
Need? Not really we could run with the same task, but the premise is to
enqueue in the event that something else could be pulled off if another task
exists, if this one is dequeued, then we run it unless the timeslice has again
said "come back later." Admittedly it's waste of a queue, but we won't know if
a task is available after the wait period.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---