westonpace commented on a change in pull request #10401:
URL: https://github.com/apache/arrow/pull/10401#discussion_r648833516



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -321,37 +335,139 @@ class ARROW_EXPORT ThreadPool : public Executor {
   // tasks are finished.
   Status Shutdown(bool wait = true);
 
-  struct State;
+  // ------------- Statistics API ---------------
+
+  /// The current number of tasks either currently running or in the queue to 
run
+  uint64_t NumTasksRunningOrQueued() const;
+  /// A guess at the maximum number of tasks running or queued at any one point
+  uint64_t MaxTasksQueued() const;
+  /// The total number of tasks that have been submitted over the lifetime of 
the pool
+  uint64_t TotalTasksQueued() const;
+
+  // ------------- Children API -----------------
+
+  /// Called by children when a worker thread completes a task
+  void RecordFinishedTask();
+  /// True if the thread pool is shutting down, should only be checked if a 
thread has
+  /// no tasks to work on.  This allows us to ensure we drain the task queue 
before
+  /// shutting down the pool.
+  ///
+  /// Once this returns true a thread must not call it (or 
ShouldWorkerQuitNow) again
+  bool ShouldWorkerQuit(ThreadIt* thread_it);
+  /// True if the thread is no longer needed (e.g. excess capacity) or if a 
quick shutdown
+  /// has been requested.  Should be checked frequently as threads can quit 
with remaining
+  /// work if this is true
+  ///
+  /// Once this returns true a thread must not call it (or ShouldWorkerQuit) 
again
+  bool ShouldWorkerQuitNow(ThreadIt* thread_it);
+  /// Should be called first by a worker thread as soon as it starts up.  
Until this call
+  /// finishes `thread_it` will not have a valid value
+  void WaitForReady();
+  /// Called by a child implementation when new work arrives that should wake 
up idle
+  /// threads.  This will notify one worker waiting on WaitForWork.  Generally 
called
+  /// in DoSubmitTask but might be called less often if a child implementation 
wants
+  /// to
+  void NotifyIdleWorker();
+  /// Called by a worker thread that is ready to wait for work to arrive
+  void WaitForWork();
+
+  struct Control;
 
  protected:
+  FRIEND_TEST(TestThreadPool, DestroyWithoutShutdown);
   FRIEND_TEST(TestThreadPool, SetCapacity);
   FRIEND_TEST(TestGlobalThreadPool, Capacity);
   friend ARROW_EXPORT ThreadPool* GetCpuThreadPool();
 
-  ThreadPool();
+  explicit ThreadPool(bool eternal = false);
 
   Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
                    StopCallback&&) override;
 
+  /// Called on the child process after a fork.  After a fork all threads will 
have ceased
+  /// running in the child process.  This method should clean up the thread 
pool state and
+  /// restart any previously running threads.
+  ///
+  /// The behavior is somewhat ill-defined if tasks are running when the fork 
happened.
+  /// For more details see ARROW-12879
+  virtual void ResetAfterFork();
+
+  /// Launches a worker thread
+  virtual std::shared_ptr<Thread> LaunchWorker(ThreadIt thread_it) = 0;
+  /// Adds a task to the task queue(s)
+  virtual void DoSubmitTask(TaskHints hints, Task task) = 0;
+  /// Should return true only if there is no work to be done
+  virtual bool Empty() = 0;
   // Collect finished worker threads, making sure the OS threads have exited
   void CollectFinishedWorkersUnlocked();
   // Launch a given number of additional workers
   void LaunchWorkersUnlocked(int threads);
+  // Marks a thread finished and removes it from the workers list
+  void MarkThreadFinishedUnlocked(ThreadIt* thread_it);
   // Get the current actual capacity
-  int GetActualCapacity();
+  int GetActualCapacity() const;
+  // Get the amount of threads we could still launch based on capacity and # 
of tasks
+  int GetAdditionalThreadsNeeded() const;
   // Reinitialize the thread pool if the pid changed
   void ProtectAgainstFork();
+  void RecordTaskSubmitted();
 
   static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
 
-  std::shared_ptr<State> sp_state_;
-  State* state_;
+  std::atomic<uint64_t> num_tasks_running_;

Review comment:
       I've made a stab at this (at the moment the implementation is called 
`WorkerControl` but I could be convinced to call it `ThreadPoolImpl` but it 
doesn't avoid the thread wrapper because of inheritance.  If `ThreadPool` is a 
base class that other implementations can extend then it has a pure virtual 
`LaunchWorker` that has to return "something that can be joined" which is what 
the `Thread` wrapper is for.  Options:
   
    * Keep the thread wrapper
    * Just include `<thread>` in `thread_pool.h`
    * Make the pimpl class extensible instead of the public class.  This will 
force (as best I can tell) all future child implementations to be in 
`thread_pool.cc`.  It would simplify the logic in other places though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to