lidavidm commented on a change in pull request #11285:
URL: https://github.com/apache/arrow/pull/11285#discussion_r723660021
##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -341,6 +341,9 @@ class ARROW_EXPORT ThreadPool : public Executor {
// tasks are finished.
Status Shutdown(bool wait = true);
+
+ void WaitForIdle();
Review comment:
As with the scanner option, should we document that this is primarily
for testing?
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -123,6 +123,12 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Fragment-specific scan options.
std::shared_ptr<FragmentScanOptions> fragment_scan_options;
+ /// Callback which will be run whenever the scanner pauses due to
backpressure
+ ///
+ /// This is mostly for debugging & tracing so that the consumer can be
notified if
+ /// they are not consuming data quickly enough.
+ std::function<void()> on_paused_callback;
Review comment:
This doesn't seem used here.
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1025,13 +1035,20 @@ TEST_F(TestBackpressure, ScanBatchesUnordered) {
EXPECT_OK_AND_ASSIGN(AsyncGenerator<EnumeratedRecordBatch> gen,
scanner->ScanBatchesUnorderedAsync());
ASSERT_FINISHES_OK(gen());
+ // Wait for the thread pool to idle. By this point the scanner should have
paused itself
+ // This helps with timing on slower CI systems where there is only one core
and the scanner
+ // might keep that core until it has scanned all the batches which never
gives the sink a
+ // chance to report it is falling behind.
+ GetCpuThreadPool()->WaitForIdle();
+ DeliverAdditionalBatches();
// The exact numbers may be imprecise due to threading but we should pretty
quickly read
// up to our backpressure limit and a little above. We should not be able
to go too far
// above.
- BusyWait(10, [&] { return TotalBatchesRead() >= kDefaultBackpressureHigh; });
+ ASSERT_TRUE(TotalBatchesRead() >= kDefaultBackpressureHigh);
Review comment:
nit: ASSERT_GE? Here and below.
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -123,6 +123,12 @@ struct ARROW_DS_EXPORT ScanOptions {
/// Fragment-specific scan options.
std::shared_ptr<FragmentScanOptions> fragment_scan_options;
+ /// Callback which will be run whenever the scanner pauses due to
backpressure
+ ///
+ /// This is mostly for debugging & tracing so that the consumer can be
notified if
+ /// they are not consuming data quickly enough.
+ std::function<void()> on_paused_callback;
Review comment:
Thanks! Looks good overall.
##########
File path: cpp/src/arrow/util/thread_pool.cc
##########
@@ -183,9 +183,10 @@ static void WorkerLoop(std::shared_ptr<ThreadPool::State>
state,
ARROW_UNUSED(std::move(task)); // release resources before waiting
for lock
lock.lock();
}
- if ARROW_PREDICT_FALSE(--state->tasks_queued_or_running_ == 0) {
- state->cv_idle_.notify_all();
- }
+ if
+ ARROW_PREDICT_FALSE(--state->tasks_queued_or_running_ == 0) {
Review comment:
Looks like a set of parentheses is missing here? (Seems like it works
because the macro definition has a set of parens, but usually this still gets
wrapped)
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -1026,6 +1026,14 @@ class TestBackpressure : public ::testing::Test {
return sum;
}
+ void Finish(AsyncGenerator<EnumeratedRecordBatch> gen) {
+ for (const auto& controlled_fragment : controlled_fragments_) {
+ controlled_fragment->Finish();
+ }
+ ASSERT_FINISHES_OK(VisitAsyncGenerator(
+ gen, [](EnumeratedRecordBatch batch) { return Status::OK(); }));
+ }
+
Review comment:
Hmm, I would expect dropping the generator/scanner to clean up
everything. Wonder what's keeping it alive. (Not a problem to solve here.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]