lidavidm commented on a change in pull request #11294:
URL: https://github.com/apache/arrow/pull/11294#discussion_r731284492
##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -422,6 +422,42 @@ def test_scanner(dataset, dataset_reader):
assert table.num_rows == scanner.count_rows()
+def test_scanner_backpressure():
+ batch = pa.record_batch([pa.array([1, 2, 3])], names=['data'])
+ num_read = 0
+ min_read = 64 # From kDefaultBackpressureHigh in scanner.h
+ end = 200
+
+ def counting_generator():
+ nonlocal num_read
+ while num_read < end:
+ time.sleep(0.01)
+ num_read += 1
+ yield batch
+
+ scanner = ds.Scanner.from_batches(
+ counting_generator(), batch.schema, use_threads=True, use_async=True)
+
+ _batch_iter = scanner.to_batches() # This line starts the scan
+
+ start = time.time()
+
+ def duration():
+ return time.time() - start
+
+ last_num_read = 0
+ backpressure_probably_hit = False
+ while duration() < 10:
+ if num_read > min_read and num_read == last_num_read:
+ backpressure_probably_hit = True
+ break
+ last_num_read = num_read
+ time.sleep(0.5)
Review comment:
I suppose we have a good number of these now, but timing-based tests
always make me feel somewhat icky. Is there value to testing this in Python as
well as C++?
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -711,6 +711,26 @@ AsyncGenerator<T>
MakeSerialReadaheadGenerator(AsyncGenerator<T> source_generato
return SerialReadaheadGenerator<T>(std::move(source_generator),
max_readahead);
}
+template <typename T>
Review comment:
nit: add a doc comment like for the others? This is an "eagerly
evaluated" ("hot" I think ReactiveX calls it?) generator?
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -1175,8 +1179,15 @@ Result<compute::ExecNode*>
MakeScanNode(compute::ExecPlan* plan,
ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
FragmentsToBatches(std::move(fragment_gen),
scan_options));
- auto merged_batch_gen =
- MakeMergedGenerator(std::move(batch_gen_gen),
scan_options->fragment_readahead);
+ AsyncGenerator<EnumeratedRecordBatch> merged_batch_gen;
+ if (require_sequenced_output) {
+ ARROW_ASSIGN_OR_RAISE(merged_batch_gen,
+
MakeSequencedMergedGenerator(std::move(batch_gen_gen),
+
scan_options->fragment_readahead));
Review comment:
(Though I suppose it shouldn't hurt.)
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1156,6 +1176,23 @@ AsyncGenerator<T>
MakeMergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
return MergedGenerator<T>(std::move(source), max_subscriptions);
}
+template <typename T>
+Result<AsyncGenerator<T>> MakeSequencedMergedGenerator(
+ AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions) {
+ if (max_subscriptions < 0) {
+ return Status::Invalid("max_subscriptions must be a positive integer");
+ }
+ if (max_subscriptions == 1) {
+ return Status::Invalid("Use MakeConcatenatedGenerator is max_subscriptions
is 1");
Review comment:
```suggestion
return Status::Invalid("Use MakeConcatenatedGenerator if
max_subscriptions is 1");
```
##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1156,6 +1176,23 @@ AsyncGenerator<T>
MakeMergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
return MergedGenerator<T>(std::move(source), max_subscriptions);
}
+template <typename T>
+Result<AsyncGenerator<T>> MakeSequencedMergedGenerator(
Review comment:
So just to make sure I get this right, this will "kick off" some number
of sub-generators (so they start doing work), but will only actually pull from
the first one (so the others shouldn't queue up too much)?
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -1175,8 +1179,15 @@ Result<compute::ExecNode*>
MakeScanNode(compute::ExecPlan* plan,
ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
FragmentsToBatches(std::move(fragment_gen),
scan_options));
- auto merged_batch_gen =
- MakeMergedGenerator(std::move(batch_gen_gen),
scan_options->fragment_readahead);
+ AsyncGenerator<EnumeratedRecordBatch> merged_batch_gen;
+ if (require_sequenced_output) {
+ ARROW_ASSIGN_OR_RAISE(merged_batch_gen,
+
MakeSequencedMergedGenerator(std::move(batch_gen_gen),
+
scan_options->fragment_readahead));
Review comment:
Does this mean MakeSequencingGenerator is now redundant, since we pull
from subgenerators sequentially?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]