westonpace commented on a change in pull request #11285:
URL: https://github.com/apache/arrow/pull/11285#discussion_r723662097
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -123,6 +123,12 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Fragment-specific scan options.
std::shared_ptr<FragmentScanOptions> fragment_scan_options;
+ /// Callback which will be run whenever the scanner pauses due to
backpressure
+ ///
+ /// This is mostly for debugging & tracing so that the consumer can be
notified if
+ /// they are not consuming data quickly enough.
+ std::function<void()> on_paused_callback;
Review comment:
Oops, I left this in. That was an earlier attempt at fixing the test >_<
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -341,6 +341,9 @@ class ARROW_EXPORT ThreadPool : public Executor {
// tasks are finished.
Status Shutdown(bool wait = true);
+
+ void WaitForIdle();
Review comment:
Yes, I will document that. ThreadPool is internal (and I'm not
proposing we add WaitForIdle to Executor) so I hope that is reasonable. CC
@pitrou for thoughts. For context: I added `WaitForIdle` to the `ThreadPool`
purely for the sake of sequencing some testing.
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -123,6 +123,12 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Fragment-specific scan options.
std::shared_ptr<FragmentScanOptions> fragment_scan_options;
+ /// Callback which will be run whenever the scanner pauses due to
backpressure
+ ///
+ /// This is mostly for debugging & tracing so that the consumer can be
notified if
+ /// they are not consuming data quickly enough.
+ std::function<void()> on_paused_callback;
Review comment:
I've removed this and did some general cleanup of the PR.
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1025,13 +1035,20 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) {
EXPECT_OK_AND_ASSIGN(AsyncGenerator<EnumeratedRecordBatch> gen,
scanner->ScanBatchesUnorderedAsync());
ASSERT_FINISHES_OK(gen());
+ // Wait for the thread pool to idle. By this point the scanner should have
paused itself
+ // This helps with timing on slower CI systems where there is only one core
and the scanner
+ // might keep that core until it has scanned all the batches which never
gives the sink a
+ // chance to report it is falling behind.
+ GetCpuThreadPool()->WaitForIdle();
+ DeliverAdditionalBatches();
// The exact numbers may be imprecise due to threading but we should pretty
quickly read
// up to our backpressure limit and a little above. We should not be able
to go too far
// above.
- BusyWait(10, [&] { return TotalBatchesRead() >= kDefaultBackpressureHigh; });
+ ASSERT_TRUE(TotalBatchesRead() >= kDefaultBackpressureHigh);
Review comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]