bkietz commented on code in PR #37514:
URL: https://github.com/apache/arrow/pull/37514#discussion_r1330104938


##########
cpp/src/arrow/dataset/file_parquet.cc:
##########
@@ -410,13 +410,50 @@ Result<std::shared_ptr<Schema>> 
ParquetFileFormat::Inspect(
 
 Result<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader(
     const FileSource& source, const std::shared_ptr<ScanOptions>& options) 
const {
-  return GetReaderAsync(source, options, nullptr).result();
+  return GetReader(source, options, /*metadata=*/nullptr);
 }
 
 Result<std::shared_ptr<parquet::arrow::FileReader>> 
ParquetFileFormat::GetReader(
     const FileSource& source, const std::shared_ptr<ScanOptions>& options,
     const std::shared_ptr<parquet::FileMetaData>& metadata) const {
-  return GetReaderAsync(source, options, metadata).result();
+  ARROW_ASSIGN_OR_RAISE(
+      auto parquet_scan_options,
+      GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, 
options.get(),
+                                                         
default_fragment_scan_options));
+  auto properties =
+      MakeReaderProperties(*this, parquet_scan_options.get(), options->pool);
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+  // `parquet::ParquetFileReader::Open` will not wrap the exception as status,
+  // so using `open_parquet_file` to wrap it.
+  auto open_parquet_file = [&]() -> 
Result<std::unique_ptr<parquet::ParquetFileReader>> {
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    auto reader = parquet::ParquetFileReader::Open(std::move(input),
+                                                   std::move(properties), 
metadata);
+    return reader;
+    END_PARQUET_CATCH_EXCEPTIONS
+  };
+
+  auto reader_opt = open_parquet_file();
+  if (!reader_opt.ok()) {
+    return WrapSourceError(reader_opt.status(), source.path());
+  }
+  auto reader = std::move(reader_opt).ValueOrDie();
+
+  std::shared_ptr<parquet::FileMetaData> reader_metadata = reader->metadata();
+  auto arrow_properties = MakeArrowReaderProperties(*this, *reader_metadata);
+  arrow_properties.set_batch_size(options->batch_size);

Review Comment:
   Please extract as much repeated code as possible to helper/private member 
functions. For example, ArrowReaderPropertiesFromOptions or add an optional 
ScanOptions argument to MakeArrowReaderProperties



##########
cpp/src/arrow/dataset/file_parquet_test.cc:
##########
@@ -367,6 +369,69 @@ TEST_F(TestParquetFileFormat, MultithreadedScan) {
   ASSERT_EQ(batches.size(), kNumRowGroups);
 }
 
+class AsyncBufferReader : public ::arrow::io::BufferReader {
+ public:
+  explicit AsyncBufferReader(std::shared_ptr<Buffer> buffer,
+                             const ::arrow::io::IOContext& ctx)
+      : ::arrow::io::BufferReader(std::move(buffer)), ctx_(ctx) {}
+  /// EXPERIMENTAL: Read data asynchronously.
+  Future<std::shared_ptr<Buffer>> ReadAsync(const ::arrow::io::IOContext& ctx,
+                                            int64_t position, int64_t nbytes) 
override {
+    auto self = checked_pointer_cast<AsyncBufferReader>(shared_from_this());
+    return DeferNotOk(ctx.executor()->Submit(
+        [self, position, nbytes]() { return self->ReadAt(position, nbytes); 
}));
+  }
+
+  const ::arrow::io::IOContext& io_context() const override { return ctx_; }
+
+ private:
+  ::arrow::io::IOContext ctx_;
+};
+
+TEST_F(TestParquetFileFormat, SingleThreadExecutor) {
+  ::arrow::io::IOContext default_io_context;
+  auto pool_executor =
+      
dynamic_cast<::arrow::internal::ThreadPool*>(default_io_context.executor());
+  if (pool_executor == nullptr) {
+    GTEST_SKIP();
+  }
+  auto origin_capacity = pool_executor->GetCapacity();
+  ASSERT_OK(pool_executor->SetCapacity(/*threads=*/1));
+
+  // Reset capacity for pool_executor
+  struct PoolResetGuard {
+    PoolResetGuard(::arrow::internal::ThreadPool* pool, int origin_capacity)
+        : pool(pool), origin_capacity(origin_capacity) {}
+    ~PoolResetGuard() {
+      Status s = pool->SetCapacity(origin_capacity);
+      if (!s.ok()) {
+        std::cerr << "Failed to reset pool capacity: " << s.ToString() << 
std::endl;
+      }
+    }
+
+    ::arrow::internal::ThreadPool* pool;
+    int origin_capacity;
+  };
+  PoolResetGuard guard(pool_executor, origin_capacity);

Review Comment:
   I think you can use `SetIOThreadPoolCapacity` and `GetIOThreadPoolCapacity` 
here:
   
   ```suggestion
     // Reset capacity for pool_executor
     struct PoolResetGuard {
       int original_capacity = io::GetIOThreadPoolCapacity();
       ~PoolResetGuard() {
         DCHECK_OK(io::SetIOThreadPoolCapacity(original_capacity));
       }
     } guard;
   ```
   
   Since this directly mutates the IO thread pool (pointed to by the default IO 
context), you shouldn't need `AsyncBufferReader::ReadAsync`
   
   I agree that it's odd `BufferReader::ReadAsync` and 
`MemoryMappedFile::ReadAsync` (the only overrides I see) ignore the io context 
argument. @lidavidm do you think they should transfer to the other executor if 
it's different?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to