bkietz commented on code in PR #37918:
URL: https://github.com/apache/arrow/pull/37918#discussion_r1340182361
##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -81,6 +81,18 @@ Result<std::shared_ptr<io::RandomAccessFile>>
FileSource::Open() const {
return custom_open_();
}
+Future<std::shared_ptr<io::RandomAccessFile>> FileSource::OpenAsync() const {
+ if (filesystem_) {
+ return filesystem_->OpenInputFileAsync(file_info_);
+ }
+
+ if (buffer_) {
+ return
Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(std::make_shared<io::BufferReader>(buffer_));
+ }
+
+ return
Future<std::shared_ptr<io::RandomAccessFile>>::MakeFinished(custom_open_());
Review Comment:
I think this TODO is valuable but should be linked to a new issue where we
extend `custom_open_` to return `Future<std::share_ptr<io::RandomAccessFile>>`.
That would be necessary to enable a custom but non blocking open
##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -479,11 +479,21 @@ Future<std::shared_ptr<parquet::arrow::FileReader>>
ParquetFileFormat::GetReader
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
- ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+ auto input_fut = source.OpenAsync();
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
- auto reader_fut = parquet::ParquetFileReader::OpenAsync(
- std::move(input), std::move(properties), metadata);
+
auto path = source.path();
+ auto reader_fut =
+ input_fut.Then([=](const std::shared_ptr<arrow::io::RandomAccessFile>&)
mutable
+ -> Result<std::unique_ptr<parquet::ParquetFileReader>> {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile>
input,
+ input_fut.MoveResult());
Review Comment:
Instead of copying `input_fut` implicitly by keeping it in closure, I think
it'd be more clear to use the value passed to the callback:
```suggestion
source.OpenAsync().Then([=](std::shared_ptr<arrow::io::RandomAccessFile> input)
-> Result<std::unique_ptr<parquet::ParquetFileReader>> {
```
(The `TODO(ARROW-12259)` only applies to futures of move-only types as
returned by `ParquetFileReader::OpenAsync()`, and doesn't apply to the return
value of `source.OpenAsync()`.)
##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -479,11 +479,21 @@ Future<std::shared_ptr<parquet::arrow::FileReader>>
ParquetFileFormat::GetReader
default_fragment_scan_options));
auto properties =
MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
- ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+ auto input_fut = source.OpenAsync();
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
- auto reader_fut = parquet::ParquetFileReader::OpenAsync(
- std::move(input), std::move(properties), metadata);
+
auto path = source.path();
+ auto reader_fut =
+ input_fut.Then([=](const std::shared_ptr<arrow::io::RandomAccessFile>&)
mutable
+ -> Result<std::unique_ptr<parquet::ParquetFileReader>> {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile>
input,
+ input_fut.MoveResult());
+ auto rfut = parquet::ParquetFileReader::OpenAsync(
+ std::move(input), std::move(properties), metadata);
+ ARROW_ASSIGN_OR_RAISE(auto reader, rfut.MoveResult());
+ return reader;
Review Comment:
For a continuation (callback passed to Future::Then) the return type can
also be Future. Please change it to Future, then you can return `rfut` directly
instead of blocking in a continuation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]