westonpace commented on a change in pull request #11285:
URL: https://github.com/apache/arrow/pull/11285#discussion_r723662097



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -123,6 +123,12 @@ struct ARROW_DS_EXPORT ScanOptions {
   /// Fragment-specific scan options.
   std::shared_ptr<FragmentScanOptions> fragment_scan_options;
 
+  /// Callback which will be run whenever the scanner pauses due to 
backpressure
+  ///
+  /// This is mostly for debugging & tracing so that the consumer can be 
notified if
+  /// they are not consuming data quickly enough.
+  std::function<void()> on_paused_callback;

Review comment:
       Oops, I left this in.  That was an earlier attempt at fixing the test >_<

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -341,6 +341,9 @@ class ARROW_EXPORT ThreadPool : public Executor {
   // tasks are finished.
   Status Shutdown(bool wait = true);
 
+
+  void WaitForIdle();

Review comment:
       Yes, I will document that.  ThreadPool is internal (and I'm not 
proposing we add WaitForIdle to Executor) so I hope that is reasonable.  CC 
@pitrou for thoughts.  For context: I added `WaitForIdle` to the `ThreadPool` 
purely for the sake of sequencing some testing.

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -123,6 +123,12 @@ struct ARROW_DS_EXPORT ScanOptions {
   /// Fragment-specific scan options.
   std::shared_ptr<FragmentScanOptions> fragment_scan_options;
 
+  /// Callback which will be run whenever the scanner pauses due to 
backpressure
+  ///
+  /// This is mostly for debugging & tracing so that the consumer can be 
notified if
+  /// they are not consuming data quickly enough.
+  std::function<void()> on_paused_callback;

Review comment:
       I've removed this and did some general cleanup of the PR.

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1025,13 +1035,20 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) {
   EXPECT_OK_AND_ASSIGN(AsyncGenerator<EnumeratedRecordBatch> gen,
                        scanner->ScanBatchesUnorderedAsync());
   ASSERT_FINISHES_OK(gen());
+  // Wait for the thread pool to idle.  By this point the scanner should have 
paused itself
+  // This helps with timing on slower CI systems where there is only one core 
and the scanner
+  // might keep that core until it has scanned all the batches which never 
gives the sink a
+  // chance to report it is falling behind.
+  GetCpuThreadPool()->WaitForIdle();
+  DeliverAdditionalBatches();
 
   // The exact numbers may be imprecise due to threading but we should pretty 
quickly read
   // up to our backpressure limit and a little above.  We should not be able 
to go too far
   // above.
-  BusyWait(10, [&] { return TotalBatchesRead() >= kDefaultBackpressureHigh; });
+  ASSERT_TRUE(TotalBatchesRead() >= kDefaultBackpressureHigh);

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to