lidavidm commented on a change in pull request #9620:
URL: https://github.com/apache/arrow/pull/9620#discussion_r643102629
##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -563,6 +692,28 @@ std::unique_ptr<ParquetFileReader>
ParquetFileReader::OpenFile(
return Open(std::move(source), props, std::move(metadata));
}
+::arrow::Future<std::unique_ptr<ParquetFileReader>>
ParquetFileReader::OpenAsync(
+ std::shared_ptr<::arrow::io::RandomAccessFile> source, const
ReaderProperties& props,
+ std::shared_ptr<FileMetaData> metadata) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ auto fut = SerializedFile::OpenAsync(std::move(source), props,
std::move(metadata));
+ // TODO(ARROW-12259): workaround since we have Future<(move-only type)>
Review comment:
The workaround is using AddCallback and marking a future complete
manually instead of using Then.
##########
File path: cpp/src/arrow/util/future.h
##########
@@ -829,6 +829,9 @@ Future<std::vector<Result<T>>> All(std::vector<Future<T>>
futures) {
return out;
}
+template <>
+inline Future<>::Future(Status s) :
Future(internal::Empty::ToResult(std::move(s))) {}
Review comment:
This is because the usual implicit constructor for `Future<Status>`
delegates to `Future(Result<ValueType>(Status))` which errors on an OK Status
(the assumption being that an OK Status will never arise); however for
`Future<>` we actually do want to handle an OK Status.
It's not inline because it's a specialization for `Future<>`. I tried to use
`enable_if` to do it inline but couldn't figure out the right template magic
needed to get it to work.
##########
File path: cpp/src/parquet/arrow/reader.h
##########
@@ -21,6 +21,8 @@
#include <memory>
#include <vector>
+#include "arrow/util/async_generator.h"
Review comment:
I think we've run into this before - with the current definition of
AsyncGenerator we can't easily provide a forward declaration, right?
##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -264,23 +264,92 @@ class SerializedFile : public ParquetFileReader::Contents
{
}
}
PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
- return cached_source_->Wait();
}
+ ::arrow::Future<> WhenBuffered(const std::vector<int>& row_groups,
+ const std::vector<int>& column_indices) const
{
+ if (!cached_source_) {
+ return ::arrow::Status::Invalid("Must call PreBuffer before
WhenBuffered");
+ }
+ std::vector<::arrow::io::ReadRange> ranges;
+ for (int row : row_groups) {
+ for (int col : column_indices) {
+ ranges.push_back(
+ ComputeColumnChunkRange(file_metadata_.get(), source_size_, row,
col));
+ }
+ }
+ return cached_source_->WaitFor(ranges);
+ }
+
+ // Metadata/footer parsing. Divided up to separate sync/async paths, and to
use
+ // exceptions for error handling (with the async path converting to
Future/Status).
+
void ParseMetaData() {
Review comment:
Originally I did do that but found I was having issues with the
encryption test flaking on Windows. I couldn't see why this change affected
that (since we wait for the future to complete) but refactoring it like this
fixed it. Also, this means we don't need a custom StatusDetail to preserve the
original exception class thrown.
##########
File path: cpp/src/parquet/encryption/test_encryption_util.cc
##########
@@ -334,7 +335,10 @@ void FileDecryptor::DecryptFile(
reader_properties.file_decryption_properties(file_decryption_properties->DeepClone());
}
- auto file_reader = parquet::ParquetFileReader::OpenFile(file, false,
reader_properties);
+ std::shared_ptr<::arrow::io::RandomAccessFile> source;
+ PARQUET_ASSIGN_OR_THROW(
+ source, ::arrow::io::ReadableFile::Open(file,
reader_properties.memory_pool()));
Review comment:
I was having issues with the encryption tests on Windows, which would
flake because the temp Parquet file was still in use when it deleted it. This
was an attempt at fixing that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]