[
https://issues.apache.org/jira/browse/MINIFI-338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095568#comment-16095568
]
ASF GitHub Bot commented on MINIFI-338:
---------------------------------------
Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi-minifi-cpp/pull/117#discussion_r128656760
--- Diff: libminifi/include/utils/ThreadPool.h ---
@@ -246,15 +349,67 @@ void ThreadPool<T>::startWorkers() {
template<typename T>
void ThreadPool<T>::run_tasks() {
auto waitperiod = std::chrono::milliseconds(1) * 100;
+ uint64_t wait_decay_ = 0;
while (running_.load()) {
+ // if we are spinning, perform a wait. If something changes in the
worker such that the timeslice has changed, we will pick that information up.
Note that it's possible
+ // we could starve for processing time if all workers are waiting. In
the event that the number of workers far exceeds the number of threads, threads
will spin and potentially
+ // wait until they arrive at a task that can be run. In this case we
reset the wait_decay and attempt to pick up a new task. This means that threads
that recently ran should
+ // be more likely to run. This is intentional.
+ if (wait_decay_ > 1000) {
+ std::this_thread::sleep_for(std::chrono::nanoseconds(wait_decay_));
+ }
Worker<T> task;
if (!worker_queue_.try_dequeue(task)) {
+
std::unique_lock<std::mutex> lock(worker_queue_mutex_);
tasks_available_.wait_for(lock, waitperiod);
continue;
}
- task.run();
+ else {
+
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+
+ bool wait_to_run = false;
+ if (task.getTimeSlice() > 1) {
+ auto now = std::chrono::system_clock::now().time_since_epoch();
+ auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(now);
+ if (task.getTimeSlice() > ms.count()) {
+ wait_to_run = true;
+ }
+ }
+ // if we have to wait we re-queue the worker.
+ if (wait_to_run) {
+ {
+ std::unique_lock<std::mutex> lock(worker_queue_mutex_);
+ if (!task_status_[task.getIdentifier()]) {
+ continue;
+ }
+ }
+ worker_queue_.enqueue(std::move(task));
--- End diff --
Need? Not really we could run with the same task, but the premise is to
enqueue in the event that something else could be pulled off if another task
exists, if this one is dequeued, then we run it unless the timeslice has again
said "come back later." Admittedly it's waste of a queue, but we won't know if
a task is available after the wait period.
> Threads can be unbounded per flow configuration
> -----------------------------------------------
>
> Key: MINIFI-338
> URL: https://issues.apache.org/jira/browse/MINIFI-338
> Project: Apache NiFi MiNiFi
> Issue Type: Bug
> Components: C++
> Reporter: marco polo
> Assignee: marco polo
>
> The number of tasks configured by a given processor should be bounded by a
> thread pool configuration. Currently the schedulers have no concept of a
> thread pool except for the component life cycle thread pool. We should
> transition the tasks to a thread pool shared by the scheduler and is globally
> configurable to better minimize the impact of processors.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)