Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128787106
  
    --- Diff: libminifi/include/utils/ThreadPool.h ---
    @@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
     template<typename T>
     void ThreadPool<T>::run_tasks() {
       auto waitperiod = std::chrono::milliseconds(1) * 100;
    +  uint64_t wait_decay_ = 0;
       while (running_.load()) {
     
    +    // if we are spinning, perform a wait. If something changes in the 
worker such that the timeslice has changed, we will pick that information up. 
Note that it's possible
    +    // we could starve for processing time if all workers are waiting. In 
the event that the number of workers far exceeds the number of threads, threads 
will spin and potentially
    +    // wait until they arrive at a task that can be run. In this case we 
reset the wait_decay and attempt to pick up a new task. This means that threads 
that recently ran should
    +    // be more likely to run. This is intentional.
    +    if (wait_decay_ > 1000) {
    +      std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
    +    }
         Worker<T> task;
         if (!worker_queue_.try_dequeue(task)) {
    +
           std::unique_lock<std::mutex> lock(worker_queue_mutex_);
           tasks_available_.wait_for(lock, waitperiod);
           continue;
         }
    -    task.run();
    +    else {
    +
    +      std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +      if (!task_status_[task.getIdentifier()]) {
    +        continue;
    +      }
    +    }
    +
    +    bool wait_to_run = false;
    +    if (task.getTimeSlice() > 1) {
    +      auto now = std::chrono::system_clock::now().time_since_epoch();
    +      auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
    +      if (task.getTimeSlice() > ms.count()) {
    +        wait_to_run = true;
    +      }
    +    }
    +    // if we have to wait we re-queue the worker.
    +    if (wait_to_run) {
    +      {
    +        std::unique_lock<std::mutex> lock(worker_queue_mutex_);
    +        if (!task_status_[task.getIdentifier()]) {
    +          continue;
    +        }
    +      }
    +      worker_queue_.enqueue(std::move(task));
    --- End diff --
    
    Unfortunately that would require a locking queue or dequeuing everything in 
order to sort. Since it is a lock free queue tasks can be enqueued and dequeued 
with only a std::move with relatively low cost. An alternative to this would be 
to make multiple queues, but then that would require a prioritized strategy to 
access each queue. I think that would be a follow on activity if the need 
arose. 
    
    The biggest negative thus far is that shutdown takes longer because we are 
now deterministically awaiting threads to end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to