MINIFI-338: Improve wait decay per pull request comments
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/9d500354 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/9d500354 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/9d500354 Branch: refs/heads/master Commit: 9d500354a3a4c5538ee425162482cb5e8af1bf00 Parents: 97c8a7f Author: Marc <[email protected]> Authored: Thu Jul 20 19:28:26 2017 -0400 Committer: Marc Parisi <[email protected]> Committed: Mon Jul 31 21:12:51 2017 -0400 ---------------------------------------------------------------------- libminifi/include/utils/ThreadPool.h | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9d500354/libminifi/include/utils/ThreadPool.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h index 8ff3975..5335c81 100644 --- a/libminifi/include/utils/ThreadPool.h +++ b/libminifi/include/utils/ThreadPool.h @@ -129,6 +129,10 @@ class Worker { virtual uint64_t getTimeSlice() { return time_slice_; } + + virtual uint64_t getWaitTime(){ + return run_determinant_->wait_time(); + } Worker<T>(const Worker<T>&) = delete; Worker<T>& operator =(const Worker<T>&) = delete; @@ -352,11 +356,19 @@ void ThreadPool<T>::run_tasks() { uint64_t wait_decay_ = 0; while (running_.load()) { + // if we exceed 500ms of wait due to not being able to run any tasks and there are tasks available, meaning + // they are eligible to run per the fact that the thread pool isn't shut down and the tasks are in a runnable state + // BUT they've been continually timesliced, we will lower the wait decay to 100ms and continue incrementing from + // there. This ensures we don't have arbitrarily long sleep cycles. + if (wait_decay_ > 500000000L){ + wait_decay_ = 100000000L; + } // if we are spinning, perform a wait. If something changes in the worker such that the timeslice has changed, we will pick that information up. Note that it's possible // we could starve for processing time if all workers are waiting. In the event that the number of workers far exceeds the number of threads, threads will spin and potentially // wait until they arrive at a task that can be run. In this case we reset the wait_decay and attempt to pick up a new task. This means that threads that recently ran should // be more likely to run. This is intentional. - if (wait_decay_ > 1000) { + + if (wait_decay_ > 2000) { std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_)); } Worker<T> task; @@ -376,9 +388,12 @@ void ThreadPool<T>::run_tasks() { bool wait_to_run = false; if (task.getTimeSlice() > 1) { + double wt = (double)task.getWaitTime(); auto now = std::chrono::system_clock::now().time_since_epoch(); - auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now); - if (task.getTimeSlice() > ms.count()) { + auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count(); + // if our differential is < 10% of the wait time we will not put the task into a wait state + // since requeuing will break the time slice contract. + if (task.getTimeSlice() > ms && (task.getTimeSlice() - ms) > (wt*.10)) { wait_to_run = true; } } @@ -392,7 +407,7 @@ void ThreadPool<T>::run_tasks() { } worker_queue_.enqueue(std::move(task)); - wait_decay_ += 100; + wait_decay_ += 25; continue; }
