lidavidm commented on a change in pull request #9620:
URL: https://github.com/apache/arrow/pull/9620#discussion_r621179845



##########
File path: cpp/src/arrow/python/io.cc
##########
@@ -265,6 +266,14 @@ Result<std::shared_ptr<Buffer>> 
PyReadableFile::ReadAt(int64_t position, int64_t
   });
 }
 
+Future<std::shared_ptr<Buffer>> PyReadableFile::ReadAsync(const io::IOContext&,

Review comment:
       The issue is when the main thread holds the GIL and calls back into 
libarrow, and then libarrow tries to call back into Python. This is OK if it's 
all on the same thread but not if it's on different threads. Really, this is an 
implementation error (any such bindings in pyarrow should release the GIL) but 
I remember it bit me somewhere during this PR.

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -33,7 +34,7 @@ namespace io {
 
 CacheOptions CacheOptions::Defaults() {
   return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
-                      internal::ReadRangeCache::kDefaultRangeSizeLimit};
+                      internal::ReadRangeCache::kDefaultRangeSizeLimit, false};

Review comment:
       Note I split this out into ARROW-12522/#10145 so I'll fix things there 
and rebase here.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -968,6 +979,99 @@ Status FileReaderImpl::GetRecordBatchReader(const 
std::vector<int>& row_groups,
   return Status::OK();
 }
 
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch vectors (where each vector is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+  using Item = ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+
+  explicit RowGroupGenerator(FileReaderImpl* arrow_reader,
+                             ::arrow::internal::Executor* executor,
+                             std::vector<int> row_groups, std::vector<int> 
column_indices)
+      : arrow_reader_(arrow_reader),
+        executor_(executor),
+        row_groups_(std::move(row_groups)),
+        column_indices_(std::move(column_indices)),
+        index_(0) {}
+
+  ::arrow::Future<Item> operator()() {
+    if (index_ >= row_groups_.size()) {
+      return 
::arrow::Future<Item>::MakeFinished(::arrow::IterationEnd<Item>());
+    }
+    int row_group = row_groups_[index_++];
+    std::vector<int> column_indices = column_indices_;
+    auto reader = arrow_reader_;
+    if (!reader->properties().pre_buffer()) {
+      return SubmitRead(executor_, reader, row_group, column_indices);
+    }
+    BEGIN_PARQUET_CATCH_EXCEPTIONS

Review comment:
       It just expands to a `try { } catch`. However I'll audit these again…I 
dislike working in this module because we're mixing exceptions/Status and the 
OpenAsync work has just exacerbated that problem a lot.

##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -277,43 +292,56 @@ class SerializedFile : public ParquetFileReader::Contents 
{
     }
 
     int64_t footer_read_size = std::min(source_size_, kDefaultFooterReadSize);
-    PARQUET_ASSIGN_OR_THROW(
-        auto footer_buffer,
-        source_->ReadAt(source_size_ - footer_read_size, footer_read_size));
-
-    // Check if all bytes are read. Check if last 4 bytes read have the magic 
bits
-    if (footer_buffer->size() != footer_read_size ||
-        (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 
4) != 0 &&
-         memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 
4) != 0)) {
-      throw ParquetInvalidOrCorruptedFileException(
-          "Parquet magic bytes not found in footer. Either the file is 
corrupted or this "
-          "is not a parquet file.");
-    }
-
-    if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 
4) == 0) {
-      // Encrypted file with Encrypted footer.
-      ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer, 
footer_read_size);
-      return;
-    }
+    return source_->ReadAsync(source_size_ - footer_read_size, 
footer_read_size)
+        .Then([=](const std::shared_ptr<::arrow::Buffer>& footer_buffer)
+                  -> ::arrow::Future<> {
+          // Check if all bytes are read. Check if last 4 bytes read have the 
magic bits
+          if (footer_buffer->size() != footer_read_size ||
+              (memcmp(footer_buffer->data() + footer_read_size - 4, 
kParquetMagic, 4) !=
+                   0 &&
+               memcmp(footer_buffer->data() + footer_read_size - 4, 
kParquetEMagic, 4) !=
+                   0)) {
+            return ::arrow::Status::FromDetailAndArgs(
+                ::arrow::StatusCode::IOError,
+                ParquetInvalidOrCorruptedFileStatusDetail::Instance(),
+                "Parquet magic bytes not found in footer. Either the file is 
corrupted "
+                "or this is not a parquet file.");
+          }
 
-    // No encryption or encryption with plaintext footer mode.
-    std::shared_ptr<Buffer> metadata_buffer;
-    uint32_t metadata_len, read_metadata_len;
-    ParseUnencryptedFileMetadata(footer_buffer, footer_read_size, 
&metadata_buffer,
-                                 &metadata_len, &read_metadata_len);
+          if (memcmp(footer_buffer->data() + footer_read_size - 4, 
kParquetEMagic, 4) ==
+              0) {
+            // Encrypted file with Encrypted footer.
+            BEGIN_PARQUET_CATCH_EXCEPTIONS
+            return 
ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer,

Review comment:
       I'm going to try and rework what's here as mixing 
exceptions/Status/Future is painful + confusing.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -968,6 +979,99 @@ Status FileReaderImpl::GetRecordBatchReader(const 
std::vector<int>& row_groups,
   return Status::OK();
 }
 
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch vectors (where each vector is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+  using Item = ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+
+  explicit RowGroupGenerator(FileReaderImpl* arrow_reader,
+                             ::arrow::internal::Executor* executor,
+                             std::vector<int> row_groups, std::vector<int> 
column_indices)
+      : arrow_reader_(arrow_reader),

Review comment:
       I'll change the API to have the user pass in the pointer. I'd rather 
have `shared_from_this` but I guess that won't work since it's typically used 
as a unique_ptr.

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -325,6 +325,45 @@ Result<std::unique_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader
   return std::move(arrow_reader);
 }
 
+Future<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReaderAsync(
+    const FileSource& source, ScanOptions* options) const {
+  ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options,
+                        GetFragmentScanOptions<ParquetFragmentScanOptions>(
+                            kParquetTypeName, options, 
default_fragment_scan_options));
+  MemoryPool* pool = options ? options->pool : default_memory_pool();
+  auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), 
pool);
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+  // Some ugliness needed due to having Future<unique_ptr<>> here
+  auto reader_fut =
+      parquet::ParquetFileReader::OpenAsync(std::move(input), 
std::move(properties));
+  auto path = source.path();
+  auto self = checked_pointer_cast<const 
ParquetFileFormat>(shared_from_this());
+  return reader_fut.Then(
+      [=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
+      -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
+        ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> 
reader,

Review comment:
       In this case it works since Future internally heap-allocates its 
implementation, and yes, this is rather iffy, but works since this is the only 
callback. I'll add a TODO referencing ARROW-12259.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to