westonpace commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619474881
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -480,13 +492,24 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
return table_fut.result();
}
+Result<EnumeratedRecordBatchGenerator>
AsyncScanner::ScanBatchesUnorderedAsync() {
+ return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
+}
+
Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
internal::Executor* cpu_executor) {
auto self = shared_from_this();
ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
FragmentsToBatches(self, std::move(fragment_gen)));
- return MakeConcatenatedGenerator(std::move(batch_gen_gen));
+ auto batch_gen_gen_readahead = MakeSerialReadaheadGenerator(
+ std::move(batch_gen_gen), scan_options_->fragment_readahead);
+ return MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+ scan_options_->fragment_readahead);
+}
Review comment:
> So here we're teeing up at most fragment_readahead active fragments,
of which we can queue up to fragment_readahead batches?
I don't know that I would phrase it that way. The first part `we're teeing
up at most fragment_readahead active fragments` is correct. Each active
fragment will queue 1 batch. So across all active fragments there will be
`fragment_readahead` queued batches. However, there is not
`fragment_readahead` batches per fragment. I'm not sure which you were
describing.
The last sentence is correct assuming that both `batch_gen_gen` and the
generated emitted by `batch_gen_gen` are truly asynchronous (e.g. they are
calling `Executor::Submit` at some point). The naive `FileFormat::ScanBatches`
blocks and isn't truly asynchronous and so `MakeMergedGenerator` won't add
parallelism.
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -480,13 +492,24 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
return table_fut.result();
}
+Result<EnumeratedRecordBatchGenerator>
AsyncScanner::ScanBatchesUnorderedAsync() {
+ return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
+}
+
Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
internal::Executor* cpu_executor) {
auto self = shared_from_this();
ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
FragmentsToBatches(self, std::move(fragment_gen)));
- return MakeConcatenatedGenerator(std::move(batch_gen_gen));
+ auto batch_gen_gen_readahead = MakeSerialReadaheadGenerator(
+ std::move(batch_gen_gen), scan_options_->fragment_readahead);
+ return MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+ scan_options_->fragment_readahead);
+}
Review comment:
> So here we're teeing up at most fragment_readahead active fragments,
of which we can queue up to fragment_readahead batches?
I don't know that I would phrase it that way. The first part `we're teeing
up at most fragment_readahead active fragments` is correct. Each active
fragment will queue 1 batch. So across all active fragments there will be
`fragment_readahead` queued batches. However, there is not
`fragment_readahead` batches per fragment. I'm not sure which you were
describing.
The last sentence is correct assuming that both `batch_gen_gen` and the
generated emitted by `batch_gen_gen` are truly asynchronous (e.g. they are
calling `Executor::Submit` at some point). The naive `FileFormat::ScanBatches`
blocks and isn't truly asynchronous and so `MakeMergedGenerator` won't add
parallelism.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]