lidavidm commented on a change in pull request #9656:
URL: https://github.com/apache/arrow/pull/9656#discussion_r591776743



##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1175,6 +1273,86 @@ Result<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::Open(
   return result;
 }
 
+Future<IpcMessageGenerator::Item> IpcMessageGenerator::operator()() {
+  if (dictionary_index_ < state_->num_dictionaries()) {
+    auto block = FileBlockFromFlatbuffer(
+        state_->footer_->dictionaries()->Get(dictionary_index_++));
+    ARROW_ASSIGN_OR_RAISE(auto fut,
+                          executor_->Submit(&ReadMessageFromBlock, block, 
state_->file_));
+    return fut;
+  } else if (record_batch_index_ < state_->num_record_batches()) {
+    auto block = FileBlockFromFlatbuffer(
+        state_->footer_->recordBatches()->Get(record_batch_index_++));
+    ARROW_ASSIGN_OR_RAISE(auto fut,
+                          executor_->Submit(&ReadMessageFromBlock, block, 
state_->file_));
+    return fut;
+  }
+  return Future<Item>::MakeFinished(IterationTraits<Item>::End());
+}
+
+Future<IpcFileRecordBatchGenerator::Item> 
IpcFileRecordBatchGenerator::operator()() {
+  if (!read_dictionaries_.is_valid()) {
+    std::vector<Future<std::shared_ptr<Message>>> dictionary_messages(
+        state_->num_dictionaries());
+    for (int i = 0; i < state_->num_dictionaries(); i++) {
+      dictionary_messages[i] = message_generator_();
+    }
+    auto dictionaries_read = arrow::All(std::move(dictionary_messages));

Review comment:
       Here, I actually want the results, not just the status. I've renamed 
this - this future is to just read the dictionary messages, not to actually 
parse them.

##########
File path: cpp/src/arrow/ipc/read_write_benchmark.cc
##########
@@ -188,10 +188,91 @@ static void DecodeStream(benchmark::State& state) {  // 
NOLINT non-const referen
   state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
 }
 
+static void ReadCompressedFile(benchmark::State& state) {  // NOLINT non-const 
reference
+  // 1MB
+  constexpr int64_t kTotalSize = 1 << 20;
+  constexpr int64_t kBatches = 128;
+  auto options = ipc::IpcWriteOptions::Defaults();
+  ASSIGN_OR_ABORT(options.codec,
+                  arrow::util::Codec::Create(arrow::Compression::type::ZSTD));
+
+  std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024);
+  {
+    // Make Arrow IPC file
+    auto record_batch = MakeRecordBatch(kTotalSize, state.range(0));
+
+    io::BufferOutputStream stream(buffer);
+
+    auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), 
options);
+    for (int i = 0; i < kBatches; i++) {
+      ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch));
+    }
+    ABORT_NOT_OK(writer->Close());
+    ABORT_NOT_OK(stream.Close());
+  }
+
+  ipc::DictionaryMemo empty_memo;
+  for (auto _ : state) {
+    io::BufferReader input(buffer);
+    auto reader =
+        *ipc::RecordBatchFileReader::Open(&input, 
ipc::IpcReadOptions::Defaults());
+    const int num_batches = reader->num_record_batches();
+    for (int i = 0; i < num_batches; ++i) {
+      auto batch = *reader->ReadRecordBatch(i);
+    }
+  }
+  state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize * kBatches);
+}
+
+static void ReadaheadCompressedFile(
+    benchmark::State& state) {  // NOLINT non-const reference
+  // 1MB
+  constexpr int64_t kTotalSize = 1 << 20;
+  constexpr int64_t kBatches = 128;
+  auto options = ipc::IpcWriteOptions::Defaults();
+  ASSIGN_OR_ABORT(options.codec,
+                  arrow::util::Codec::Create(arrow::Compression::type::ZSTD));
+
+  std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024);
+  {
+    // Make Arrow IPC file
+    auto record_batch = MakeRecordBatch(kTotalSize, state.range(0));
+
+    io::BufferOutputStream stream(buffer);
+
+    auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(), 
options);
+    for (int i = 0; i < kBatches; i++) {
+      ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch));
+    }
+    ABORT_NOT_OK(writer->Close());
+    ABORT_NOT_OK(stream.Close());
+  }
+
+  ipc::DictionaryMemo empty_memo;
+  for (auto _ : state) {
+    io::BufferReader input(buffer);
+    auto reader =
+        *ipc::RecordBatchFileReader::Open(&input, 
ipc::IpcReadOptions::Defaults());
+    ASSIGN_OR_ABORT(auto generator,
+                    reader->GetRecordBatchGenerator(/*readahead_messages=*/4,
+                                                    io::default_io_context(),
+                                                    
arrow::internal::GetCpuThreadPool()));
+    generator = MakeReadaheadGenerator(std::move(generator), /*readahead=*/4);

Review comment:
       I think actually the one at the message level is the redundant one here, 
since that's trivial (the file's in memory), but the decoding work is 
nontrivial (compression). Note that the compression isn't done in the 
IpcMessageGenerator, but is done as part of decoding the record batch. 
   
   With both sets of readahead:
   ```
   ReadaheadCompressedFile/1/real_time      65481827 ns      4493732 ns         
   8 bytes_per_second=1.90893G/s
   ReadaheadCompressedFile/4/real_time      29108664 ns      3322772 ns         
  24 bytes_per_second=4.29425G/s
   ReadaheadCompressedFile/16/real_time     31461325 ns      6222532 ns         
  22 bytes_per_second=3.97313G/s
   ReadaheadCompressedFile/64/real_time     44750635 ns     24064817 ns         
  16 bytes_per_second=2.79326G/s
   ReadaheadCompressedFile/256/real_time   122477440 ns     91270202 ns         
   6 bytes_per_second=1045.09M/s
   ReadaheadCompressedFile/1024/real_time  515403669 ns    381340640 ns         
   1 bytes_per_second=248.349M/s
   ```
   
   With only I/O-level readahead:
   ```
   ReadaheadCompressedFile/1/real_time     533258427 ns     11385747 ns         
   1 bytes_per_second=240.034M/s
   ReadaheadCompressedFile/4/real_time      71233474 ns      2145576 ns         
   9 bytes_per_second=1.75479G/s
   ReadaheadCompressedFile/16/real_time     45455989 ns      3591831 ns         
  15 bytes_per_second=2.74991G/s
   ReadaheadCompressedFile/64/real_time     64612808 ns      8730379 ns         
  11 bytes_per_second=1.9346G/s
   ReadaheadCompressedFile/256/real_time   188120059 ns     18770867 ns         
   4 bytes_per_second=680.417M/s
   ReadaheadCompressedFile/1024/real_time  699025221 ns     58355812 ns         
   1 bytes_per_second=183.112M/s
   ```
   
   With only batch-level readahead:
   ```
   ReadaheadCompressedFile/1/real_time      59513029 ns      3280963 ns         
  11 bytes_per_second=2.10038G/s
   ReadaheadCompressedFile/4/real_time      27160986 ns      2378473 ns         
  25 bytes_per_second=4.60219G/s
   ReadaheadCompressedFile/16/real_time     30018574 ns      3746521 ns         
  23 bytes_per_second=4.16409G/s
   ReadaheadCompressedFile/64/real_time     41358054 ns     11515016 ns         
  17 bytes_per_second=3.02239G/s
   ReadaheadCompressedFile/256/real_time    80396235 ns     26029905 ns         
   9 bytes_per_second=1.5548G/s
   ReadaheadCompressedFile/1024/real_time  473883489 ns     96278350 ns         
   2 bytes_per_second=270.109M/s
   ```
   
   With no readahead:
   ```
   ReadaheadCompressedFile/1/real_time     544947010 ns      8763782 ns         
   2 bytes_per_second=234.885M/s
   ReadaheadCompressedFile/4/real_time      73232625 ns      1522407 ns         
   8 bytes_per_second=1.70689G/s
   ReadaheadCompressedFile/16/real_time     47781962 ns      2977593 ns         
  15 bytes_per_second=2.61605G/s
   ReadaheadCompressedFile/64/real_time     64026555 ns      7882798 ns         
  11 bytes_per_second=1.95231G/s
   ReadaheadCompressedFile/256/real_time   187098473 ns     18156513 ns         
   4 bytes_per_second=684.132M/s
   ReadaheadCompressedFile/1024/real_time  696976648 ns     57355852 ns         
   1 bytes_per_second=183.65M/s
   ```
   
   So I'll change this to only test batch-level readahead. I/O level readahead 
would help more on something like S3, which we could set up a benchmark for as 
well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to