westonpace commented on a change in pull request #9656:
URL: https://github.com/apache/arrow/pull/9656#discussion_r621430711



##########
File path: cpp/src/arrow/dataset/file_ipc.cc
##########
@@ -73,6 +88,27 @@ static inline Result<std::vector<int>> GetIncludedFields(
   return included_fields;
 }
 
+static inline Result<ipc::IpcReadOptions> GetReadOptions(
+    const Schema& schema, FileFormat* format, const ScanOptions* scan_options) 
{

Review comment:
       Nit: I'm pretty sure you can do `const ScanOptions& scan_options` here.

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1009,6 +1010,47 @@ struct FileWriterHelper {
   int64_t footer_offset_;
 };
 
+struct FileGeneratorWriterHelper : public FileWriterHelper {
+  Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* 
out_batches,
+                     ReadStats* out_stats = nullptr) override {
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+    AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+
+    {
+      auto fut =
+          RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, 
options);
+      RETURN_NOT_OK(fut.status());
+      EXPECT_FINISHES_OK_AND_ASSIGN(auto reader, fut);
+      EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+      // Generator's lifetime is independent of the reader's
+      ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator());
+    }
+
+    // Generator is async-reentrant
+    std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+    for (int i = 0; i < num_batches_written_; ++i) {
+      futures.push_back(generator());
+    }
+    auto fut = generator();
+    EXPECT_FINISHES_OK_AND_ASSIGN(auto extra_read, fut);
+    EXPECT_EQ(nullptr, extra_read);
+    for (auto& future : futures) {
+      EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
+      out_batches->push_back(batch);
+    }
+
+    // The generator doesn't track stats.
+    EXPECT_EQ(nullptr, out_stats);
+
+    return Status::OK();
+  }
+
+  Status Read(const IpcReadOptions& options, RecordBatchVector* out_batches,

Review comment:
       This isn't related to this PR but why is there `Read` and `ReadBatches`?

##########
File path: cpp/src/arrow/dataset/file_ipc.cc
##########
@@ -173,6 +192,28 @@ Result<ScanTaskIterator> IpcFileFormat::ScanFile(
   return IpcScanTaskIterator::Make(options, fragment);
 }
 
+Result<RecordBatchGenerator> IpcFileFormat::ScanBatchesAsync(
+    const std::shared_ptr<ScanOptions>& options,
+    const std::shared_ptr<FileFragment>& file) {
+  auto self = shared_from_this();
+  auto source = file->source();
+  auto open_reader = OpenReaderAsync(source);
+  auto reopen_reader = [self, options,
+                        source](std::shared_ptr<ipc::RecordBatchFileReader> 
reader)
+      -> Future<std::shared_ptr<ipc::RecordBatchFileReader>> {
+    ARROW_ASSIGN_OR_RAISE(auto options,
+                          GetReadOptions(*reader->schema(), self.get(), 
options.get()));
+    return OpenReader(source, options);
+  };
+  auto readahead_level = options->batch_readahead;
+  auto open_generator = [=](std::shared_ptr<ipc::RecordBatchFileReader> reader)

Review comment:
       Nit: Change `reader` to a const reference.  Since there could be 
multiple callbacks we can never move into a callback so it's always const 
reference or a copy.

##########
File path: cpp/src/arrow/dataset/file_ipc.cc
##########
@@ -73,6 +88,27 @@ static inline Result<std::vector<int>> GetIncludedFields(
   return included_fields;
 }
 
+static inline Result<ipc::IpcReadOptions> GetReadOptions(
+    const Schema& schema, FileFormat* format, const ScanOptions* scan_options) 
{
+  ARROW_ASSIGN_OR_RAISE(
+      auto ipc_scan_options,
+      GetFragmentScanOptions<IpcFragmentScanOptions>(
+          kIpcTypeName, scan_options, format->default_fragment_scan_options));
+  auto options =
+      ipc_scan_options->options ? *ipc_scan_options->options : 
default_read_options();
+  options.memory_pool = scan_options->pool;
+  options.use_threads = false;

Review comment:
       Why force `use_threads` to false?

##########
File path: cpp/src/arrow/dataset/file_ipc.cc
##########
@@ -73,6 +88,27 @@ static inline Result<std::vector<int>> GetIncludedFields(
   return included_fields;
 }
 
+static inline Result<ipc::IpcReadOptions> GetReadOptions(
+    const Schema& schema, FileFormat* format, const ScanOptions* scan_options) 
{

Review comment:
       Same with `format`.

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1009,6 +1010,47 @@ struct FileWriterHelper {
   int64_t footer_offset_;
 };
 
+struct FileGeneratorWriterHelper : public FileWriterHelper {
+  Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* 
out_batches,
+                     ReadStats* out_stats = nullptr) override {
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+    AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+
+    {
+      auto fut =
+          RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, 
options);
+      RETURN_NOT_OK(fut.status());

Review comment:
       This is redundant since the next line will check it.

##########
File path: cpp/src/arrow/ipc/message.cc
##########
@@ -324,6 +325,60 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t 
offset, int32_t metadata_le
   }
 }
 
+Future<std::shared_ptr<Message>> ReadMessageAsync(int64_t offset, int32_t 
metadata_length,
+                                                  int64_t body_length,
+                                                  io::RandomAccessFile* file,
+                                                  const io::IOContext& 
context) {
+  struct State {
+    std::unique_ptr<Message> result;
+    std::shared_ptr<MessageDecoderListener> listener;
+    std::shared_ptr<MessageDecoder> decoder;
+  };
+  auto state = std::make_shared<State>();

Review comment:
       Since `State` is already a `shared_ptr` do `result`, `listener`, and 
`decoder` need to be separate allocations?

##########
File path: cpp/src/arrow/ipc/read_write_benchmark.cc
##########
@@ -188,9 +159,103 @@ static void DecodeStream(benchmark::State& state) {  // 
NOLINT non-const referen
   state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
 }
 
+#define GENERATE_COMPRESSED_DATA_IN_MEMORY()                                   
   \
+  constexpr int64_t kTotalSize = 1 << 20; /* 1 MB */                           
   \

Review comment:
       Maybe `kBatchSize`?  `kTotalSize` makes me think it is the size of all 
the batches.

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1009,6 +1010,47 @@ struct FileWriterHelper {
   int64_t footer_offset_;
 };
 
+struct FileGeneratorWriterHelper : public FileWriterHelper {
+  Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* 
out_batches,
+                     ReadStats* out_stats = nullptr) override {
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+    AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+
+    {
+      auto fut =
+          RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, 
options);
+      RETURN_NOT_OK(fut.status());
+      EXPECT_FINISHES_OK_AND_ASSIGN(auto reader, fut);
+      EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+      // Generator's lifetime is independent of the reader's

Review comment:
       Is this still true?  I read this as "the lifetimes are not managed 
together, therefore we need to keep it alive"  but I think the generator is 
keeping the reader alive (indeed, I think you have set this test up to verify 
that).

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1009,6 +1010,47 @@ struct FileWriterHelper {
   int64_t footer_offset_;
 };
 
+struct FileGeneratorWriterHelper : public FileWriterHelper {
+  Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* 
out_batches,
+                     ReadStats* out_stats = nullptr) override {
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+    AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+
+    {
+      auto fut =
+          RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, 
options);
+      RETURN_NOT_OK(fut.status());
+      EXPECT_FINISHES_OK_AND_ASSIGN(auto reader, fut);
+      EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+      // Generator's lifetime is independent of the reader's
+      ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator());
+    }
+
+    // Generator is async-reentrant
+    std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+    for (int i = 0; i < num_batches_written_; ++i) {
+      futures.push_back(generator());
+    }
+    auto fut = generator();
+    EXPECT_FINISHES_OK_AND_ASSIGN(auto extra_read, fut);

Review comment:
       We have a `ASSERT_FINISHES_OK_AND_EQ` so maybe we can add the expect 
equivalent (or just use the assert version)

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1175,6 +1284,92 @@ Result<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::Open(
   return result;
 }
 
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    const std::shared_ptr<io::RandomAccessFile>& file, const IpcReadOptions& 
options) {
+  ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+  return OpenAsync(std::move(file), footer_offset, options);
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    io::RandomAccessFile* file, const IpcReadOptions& options) {
+  ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+  return OpenAsync(file, footer_offset, options);
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
+    const IpcReadOptions& options) {
+  auto result = std::make_shared<RecordBatchFileReaderImpl>();
+  return result->OpenAsync(file, footer_offset, options)
+      .Then(
+          [=](...) -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& 
options) {
+  auto result = std::make_shared<RecordBatchFileReaderImpl>();
+  return result->OpenAsync(file, footer_offset, options)
+      .Then(
+          [=](...) -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
+}
+
+Future<IpcFileRecordBatchGenerator::Item> 
IpcFileRecordBatchGenerator::operator()() {
+  auto state = state_;
+  if (!read_dictionaries_.is_valid()) {
+    std::vector<Future<std::shared_ptr<Message>>> 
messages(state->num_dictionaries());
+    for (int i = 0; i < state->num_dictionaries(); i++) {
+      auto block = 
FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
+      messages[i] = ReadMessageFromBlockAsync(block, state->file_, 
io_context_);
+    }
+    auto read_messages = All(std::move(messages));
+    if (executor_) read_messages = executor_->Transfer(read_messages);
+    read_dictionaries_ = read_messages.Then(
+        [=](const std::vector<Result<std::shared_ptr<Message>>> maybe_messages)
+            -> Result<std::shared_ptr<Message>> {
+          std::vector<std::shared_ptr<Message>> 
messages(state->num_dictionaries());
+          for (size_t i = 0; i < messages.size(); i++) {
+            ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]);
+          }
+          return ReadDictionaries(state.get(), std::move(messages));
+        });
+  }
+  if (index_ >= state_->num_record_batches()) {
+    return Future<Item>::MakeFinished(IterationTraits<Item>::End());
+  }
+  auto block = 
FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
+  auto read_message = ReadMessageFromBlockAsync(block, state->file_, 
io_context_);
+  std::vector<Future<std::shared_ptr<Message>>> 
dependencies{read_dictionaries_,
+                                                             
std::move(read_message)};
+  auto read_messages = All(dependencies);
+  if (executor_) read_messages = executor_->Transfer(read_messages);
+  return read_messages.Then(
+      [=](const std::vector<Result<std::shared_ptr<Message>>> maybe_messages)
+          -> Result<Item> {
+        RETURN_NOT_OK(maybe_messages[0]);  // Make sure dictionaries were read
+        ARROW_ASSIGN_OR_RAISE(auto message, maybe_messages[1]);
+        return ReadRecordBatch(state.get(), message.get());
+      });
+}
+
+Result<std::shared_ptr<Message>> IpcFileRecordBatchGenerator::ReadDictionaries(
+    RecordBatchFileReaderImpl* state,
+    std::vector<std::shared_ptr<Message>> dictionary_messages) {
+  IpcReadContext context(&state->dictionary_memo_, state->options_, 
state->swap_endian_);
+  for (const auto& message : dictionary_messages) {
+    RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
+  }
+  return nullptr;
+}
+
+Result<std::shared_ptr<RecordBatch>> 
IpcFileRecordBatchGenerator::ReadRecordBatch(
+    RecordBatchFileReaderImpl* state, Message* message) {
+  CHECK_HAS_BODY(*message);

Review comment:
       Why a macro instead of a small helper function?

##########
File path: cpp/src/arrow/ipc/reader.h
##########
@@ -147,6 +149,26 @@ class ARROW_EXPORT RecordBatchFileReader {
       const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
       const IpcReadOptions& options = IpcReadOptions::Defaults());
 
+  /// \brief Open a file asynchronously (owns the file).
+  static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync(
+      const std::shared_ptr<io::RandomAccessFile>& file,

Review comment:
       This overhead seems superfluous to me given there is one that takes 
`io::RandomAccessFile*` already.

##########
File path: cpp/src/arrow/ipc/reader.h
##########
@@ -147,6 +149,26 @@ class ARROW_EXPORT RecordBatchFileReader {
       const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
       const IpcReadOptions& options = IpcReadOptions::Defaults());
 
+  /// \brief Open a file asynchronously (owns the file).
+  static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync(
+      const std::shared_ptr<io::RandomAccessFile>& file,
+      const IpcReadOptions& options = IpcReadOptions::Defaults());
+
+  /// \brief Open a file asynchronously (borrows the file).
+  static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync(
+      io::RandomAccessFile* file,
+      const IpcReadOptions& options = IpcReadOptions::Defaults());
+
+  /// \brief Open a file asynchronously (owns the file).
+  static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync(
+      const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,

Review comment:
       Same as above, do we need both versions?

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1175,6 +1284,92 @@ Result<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::Open(
   return result;
 }
 
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    const std::shared_ptr<io::RandomAccessFile>& file, const IpcReadOptions& 
options) {
+  ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+  return OpenAsync(std::move(file), footer_offset, options);
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    io::RandomAccessFile* file, const IpcReadOptions& options) {
+  ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+  return OpenAsync(file, footer_offset, options);
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
+    const IpcReadOptions& options) {
+  auto result = std::make_shared<RecordBatchFileReaderImpl>();
+  return result->OpenAsync(file, footer_offset, options)
+      .Then(
+          [=](...) -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& 
options) {
+  auto result = std::make_shared<RecordBatchFileReaderImpl>();
+  return result->OpenAsync(file, footer_offset, options)
+      .Then(
+          [=](...) -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
+}
+
+Future<IpcFileRecordBatchGenerator::Item> 
IpcFileRecordBatchGenerator::operator()() {
+  auto state = state_;
+  if (!read_dictionaries_.is_valid()) {
+    std::vector<Future<std::shared_ptr<Message>>> 
messages(state->num_dictionaries());
+    for (int i = 0; i < state->num_dictionaries(); i++) {
+      auto block = 
FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
+      messages[i] = ReadMessageFromBlockAsync(block, state->file_, 
io_context_);

Review comment:
       This could trigger a lot of small reads depending on record batch 
configuration but I think this is tackled with coalescing?

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1175,6 +1284,92 @@ Result<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::Open(
   return result;
 }
 
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    const std::shared_ptr<io::RandomAccessFile>& file, const IpcReadOptions& 
options) {
+  ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+  return OpenAsync(std::move(file), footer_offset, options);
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    io::RandomAccessFile* file, const IpcReadOptions& options) {
+  ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+  return OpenAsync(file, footer_offset, options);
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
+    const IpcReadOptions& options) {
+  auto result = std::make_shared<RecordBatchFileReaderImpl>();
+  return result->OpenAsync(file, footer_offset, options)
+      .Then(
+          [=](...) -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& 
options) {
+  auto result = std::make_shared<RecordBatchFileReaderImpl>();
+  return result->OpenAsync(file, footer_offset, options)
+      .Then(
+          [=](...) -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
+}
+
+Future<IpcFileRecordBatchGenerator::Item> 
IpcFileRecordBatchGenerator::operator()() {
+  auto state = state_;
+  if (!read_dictionaries_.is_valid()) {
+    std::vector<Future<std::shared_ptr<Message>>> 
messages(state->num_dictionaries());
+    for (int i = 0; i < state->num_dictionaries(); i++) {
+      auto block = 
FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
+      messages[i] = ReadMessageFromBlockAsync(block, state->file_, 
io_context_);
+    }
+    auto read_messages = All(std::move(messages));
+    if (executor_) read_messages = executor_->Transfer(read_messages);
+    read_dictionaries_ = read_messages.Then(
+        [=](const std::vector<Result<std::shared_ptr<Message>>> maybe_messages)
+            -> Result<std::shared_ptr<Message>> {
+          std::vector<std::shared_ptr<Message>> 
messages(state->num_dictionaries());
+          for (size_t i = 0; i < messages.size(); i++) {
+            ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]);
+          }
+          return ReadDictionaries(state.get(), std::move(messages));
+        });
+  }
+  if (index_ >= state_->num_record_batches()) {
+    return Future<Item>::MakeFinished(IterationTraits<Item>::End());
+  }
+  auto block = 
FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
+  auto read_message = ReadMessageFromBlockAsync(block, state->file_, 
io_context_);
+  std::vector<Future<std::shared_ptr<Message>>> 
dependencies{read_dictionaries_,

Review comment:
       I agree with the comment above that this forces `read_dictionaries_` to 
be a rather odd future.  Is there any reason you don't want to do:
   
   ```
   auto read_messages = read_dictionaries_.Then([] (...) {return 
read_message;});
   ```
   
   I think it cleans up the surrounding code nicely and you can change 
`read_dictionaries_` to `Future<>`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to