[
https://issues.apache.org/jira/browse/MINIFI-338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16096298#comment-16096298
]
ASF GitHub Bot commented on MINIFI-338:
---------------------------------------
Github user benqiu2016 commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128773288
--- Diff: libminifi/include/utils/ThreadPool.h ---
@@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
template<typename T>
void ThreadPool<T>::run_tasks() {
auto waitperiod = std::chrono::milliseconds(1) * 100;
+ uint64_t wait_decay_ = 0;
while (running_.load()) {
+ // if we are spinning, perform a wait. If something changes in the
worker such that the timeslice has changed, we will pick that information up.
Note that it's possible
+ // we could starve for processing time if all workers are waiting. In
the event that the number of workers far exceeds the number of threads, threads
will spin and potentially
+ // wait until they arrive at a task that can be run. In this case we
reset the wait_decay and attempt to pick up a new task. This means that threads
that recently ran should
+ // be more likely to run. This is intentional.
+ if (wait_decay_ > 1000) {
+ std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
+ }
Worker<T> task;
if (!worker_queue_.try_dequeue(task)) {
+
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
tasks_available_.wait_for(lock, waitperiod);
continue;
}
- task.run();
+ else {
+
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+
+ bool wait_to_run = false;
+ if (task.getTimeSlice() > 1) {
+ auto now = std::chrono::system_clock::now().time_since_epoch();
+ auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
+ if (task.getTimeSlice() > ms.count()) {
+ wait_to_run = true;
+ }
+ }
+ // if we have to wait we re-queue the worker.
+ if (wait_to_run) {
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+ worker_queue_.enqueue(std::move(task));
--- End diff --
OK. it is possible to sort the queue or somehow to make it such that the
head of the queue is the first to expire.
In this case, we can avoid enqueue/dequeue for all the items in the queues.
> Threads can be unbounded per flow configuration
> -----------------------------------------------
>
> Key: MINIFI-338
> URL: https://issues.apache.org/jira/browse/MINIFI-338
> Project: Apache NiFi MiNiFi
> Issue Type: Bug
> Components: C++
> Reporter: marco polo
> Assignee: marco polo
>
> The number of tasks configured by a given processor should be bounded by a
> thread pool configuration. Currently the schedulers have no concept of a
> thread pool except for the component life cycle thread pool. We should
> transition the tasks to a thread pool shared by the scheduler and is globally
> configurable to better minimize the impact of processors.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)