lidavidm commented on a change in pull request #11285:
URL: https://github.com/apache/arrow/pull/11285#discussion_r723660021



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -341,6 +341,9 @@ class ARROW_EXPORT ThreadPool : public Executor {
   // tasks are finished.
   Status Shutdown(bool wait = true);
 
+
+  void WaitForIdle();

Review comment:
       As with the scanner option, should we document that this is primarily 
for testing?

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -123,6 +123,12 @@ struct ARROW_DS_EXPORT ScanOptions {
   /// Fragment-specific scan options.
   std::shared_ptr<FragmentScanOptions> fragment_scan_options;
 
+  /// Callback which will be run whenever the scanner pauses due to 
backpressure
+  ///
+  /// This is mostly for debugging & tracing so that the consumer can be 
notified if
+  /// they are not consuming data quickly enough.
+  std::function<void()> on_paused_callback;

Review comment:
       This doesn't seem used here.

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1025,13 +1035,20 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) {
   EXPECT_OK_AND_ASSIGN(AsyncGenerator<EnumeratedRecordBatch> gen,
                        scanner->ScanBatchesUnorderedAsync());
   ASSERT_FINISHES_OK(gen());
+  // Wait for the thread pool to idle.  By this point the scanner should have 
paused itself
+  // This helps with timing on slower CI systems where there is only one core 
and the scanner
+  // might keep that core until it has scanned all the batches which never 
gives the sink a
+  // chance to report it is falling behind.
+  GetCpuThreadPool()->WaitForIdle();
+  DeliverAdditionalBatches();
 
   // The exact numbers may be imprecise due to threading but we should pretty 
quickly read
   // up to our backpressure limit and a little above.  We should not be able 
to go too far
   // above.
-  BusyWait(10, [&] { return TotalBatchesRead() >= kDefaultBackpressureHigh; });
+  ASSERT_TRUE(TotalBatchesRead() >= kDefaultBackpressureHigh);

Review comment:
       nit: ASSERT_GE? Here and below.

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -123,6 +123,12 @@ struct ARROW_DS_EXPORT ScanOptions {
   /// Fragment-specific scan options.
   std::shared_ptr<FragmentScanOptions> fragment_scan_options;
 
+  /// Callback which will be run whenever the scanner pauses due to 
backpressure
+  ///
+  /// This is mostly for debugging & tracing so that the consumer can be 
notified if
+  /// they are not consuming data quickly enough.
+  std::function<void()> on_paused_callback;

Review comment:
       Thanks! Looks good overall.

##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -183,9 +183,10 @@ static void WorkerLoop(std::shared_ptr<ThreadPool::State> 
state,
         ARROW_UNUSED(std::move(task));  // release resources before waiting 
for lock
         lock.lock();
       }
-      if ARROW_PREDICT_FALSE(--state->tasks_queued_or_running_ == 0) {
-        state->cv_idle_.notify_all();
-      }
+      if
+        ARROW_PREDICT_FALSE(--state->tasks_queued_or_running_ == 0) {

Review comment:
       Looks like a set of parentheses is missing here? (Seems like it works 
because the macro definition has a set of parens, but usually this still gets 
wrapped)

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1026,6 +1026,14 @@ class TestBackpressure : public ::testing::Test {
     return sum;
   }
 
+  void Finish(AsyncGenerator<EnumeratedRecordBatch> gen) {
+    for (const auto& controlled_fragment : controlled_fragments_) {
+      controlled_fragment->Finish();
+    }
+    ASSERT_FINISHES_OK(VisitAsyncGenerator(
+        gen, [](EnumeratedRecordBatch batch) { return Status::OK(); }));
+  }
+

Review comment:
       Hmm, I would expect dropping the generator/scanner to clean up 
everything. Wonder what's keeping it alive. (Not a problem to solve here.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to