lidavidm commented on a change in pull request #9656:
URL: https://github.com/apache/arrow/pull/9656#discussion_r591776743
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1175,6 +1273,86 @@ Result<std::shared_ptr<RecordBatchFileReader>>
RecordBatchFileReader::Open(
return result;
}
+Future<IpcMessageGenerator::Item> IpcMessageGenerator::operator()() {
+ if (dictionary_index_ < state_->num_dictionaries()) {
+ auto block = FileBlockFromFlatbuffer(
+ state_->footer_->dictionaries()->Get(dictionary_index_++));
+ ARROW_ASSIGN_OR_RAISE(auto fut,
+ executor_->Submit(&ReadMessageFromBlock, block,
state_->file_));
+ return fut;
+ } else if (record_batch_index_ < state_->num_record_batches()) {
+ auto block = FileBlockFromFlatbuffer(
+ state_->footer_->recordBatches()->Get(record_batch_index_++));
+ ARROW_ASSIGN_OR_RAISE(auto fut,
+ executor_->Submit(&ReadMessageFromBlock, block,
state_->file_));
+ return fut;
+ }
+ return Future<Item>::MakeFinished(IterationTraits<Item>::End());
+}
+
+Future<IpcFileRecordBatchGenerator::Item>
IpcFileRecordBatchGenerator::operator()() {
+ if (!read_dictionaries_.is_valid()) {
+ std::vector<Future<std::shared_ptr<Message>>> dictionary_messages(
+ state_->num_dictionaries());
+ for (int i = 0; i < state_->num_dictionaries(); i++) {
+ dictionary_messages[i] = message_generator_();
+ }
+ auto dictionaries_read = arrow::All(std::move(dictionary_messages));
Review comment:
Here, I actually want the results, not just the status. I've renamed
this - this future is to just read the dictionary messages, not to actually
parse them.
##########
File path: cpp/src/arrow/ipc/read_write_benchmark.cc
##########
@@ -188,10 +188,91 @@ static void DecodeStream(benchmark::State& state) { //
NOLINT non-const referen
state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize);
}
+static void ReadCompressedFile(benchmark::State& state) { // NOLINT non-const
reference
+ // 1MB
+ constexpr int64_t kTotalSize = 1 << 20;
+ constexpr int64_t kBatches = 128;
+ auto options = ipc::IpcWriteOptions::Defaults();
+ ASSIGN_OR_ABORT(options.codec,
+ arrow::util::Codec::Create(arrow::Compression::type::ZSTD));
+
+ std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024);
+ {
+ // Make Arrow IPC file
+ auto record_batch = MakeRecordBatch(kTotalSize, state.range(0));
+
+ io::BufferOutputStream stream(buffer);
+
+ auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(),
options);
+ for (int i = 0; i < kBatches; i++) {
+ ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch));
+ }
+ ABORT_NOT_OK(writer->Close());
+ ABORT_NOT_OK(stream.Close());
+ }
+
+ ipc::DictionaryMemo empty_memo;
+ for (auto _ : state) {
+ io::BufferReader input(buffer);
+ auto reader =
+ *ipc::RecordBatchFileReader::Open(&input,
ipc::IpcReadOptions::Defaults());
+ const int num_batches = reader->num_record_batches();
+ for (int i = 0; i < num_batches; ++i) {
+ auto batch = *reader->ReadRecordBatch(i);
+ }
+ }
+ state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize * kBatches);
+}
+
+static void ReadaheadCompressedFile(
+ benchmark::State& state) { // NOLINT non-const reference
+ // 1MB
+ constexpr int64_t kTotalSize = 1 << 20;
+ constexpr int64_t kBatches = 128;
+ auto options = ipc::IpcWriteOptions::Defaults();
+ ASSIGN_OR_ABORT(options.codec,
+ arrow::util::Codec::Create(arrow::Compression::type::ZSTD));
+
+ std::shared_ptr<ResizableBuffer> buffer = *AllocateResizableBuffer(1024);
+ {
+ // Make Arrow IPC file
+ auto record_batch = MakeRecordBatch(kTotalSize, state.range(0));
+
+ io::BufferOutputStream stream(buffer);
+
+ auto writer = *ipc::MakeFileWriter(&stream, record_batch->schema(),
options);
+ for (int i = 0; i < kBatches; i++) {
+ ABORT_NOT_OK(writer->WriteRecordBatch(*record_batch));
+ }
+ ABORT_NOT_OK(writer->Close());
+ ABORT_NOT_OK(stream.Close());
+ }
+
+ ipc::DictionaryMemo empty_memo;
+ for (auto _ : state) {
+ io::BufferReader input(buffer);
+ auto reader =
+ *ipc::RecordBatchFileReader::Open(&input,
ipc::IpcReadOptions::Defaults());
+ ASSIGN_OR_ABORT(auto generator,
+ reader->GetRecordBatchGenerator(/*readahead_messages=*/4,
+ io::default_io_context(),
+
arrow::internal::GetCpuThreadPool()));
+ generator = MakeReadaheadGenerator(std::move(generator), /*readahead=*/4);
Review comment:
I think actually the one at the message level is the redundant one here,
since that's trivial (the file's in memory), but the decoding work is
nontrivial (compression). Note that the compression isn't done in the
IpcMessageGenerator, but is done as part of decoding the record batch.
With both sets of readahead:
```
ReadaheadCompressedFile/1/real_time 65481827 ns 4493732 ns
8 bytes_per_second=1.90893G/s
ReadaheadCompressedFile/4/real_time 29108664 ns 3322772 ns
24 bytes_per_second=4.29425G/s
ReadaheadCompressedFile/16/real_time 31461325 ns 6222532 ns
22 bytes_per_second=3.97313G/s
ReadaheadCompressedFile/64/real_time 44750635 ns 24064817 ns
16 bytes_per_second=2.79326G/s
ReadaheadCompressedFile/256/real_time 122477440 ns 91270202 ns
6 bytes_per_second=1045.09M/s
ReadaheadCompressedFile/1024/real_time 515403669 ns 381340640 ns
1 bytes_per_second=248.349M/s
```
With only I/O-level readahead:
```
ReadaheadCompressedFile/1/real_time 533258427 ns 11385747 ns
1 bytes_per_second=240.034M/s
ReadaheadCompressedFile/4/real_time 71233474 ns 2145576 ns
9 bytes_per_second=1.75479G/s
ReadaheadCompressedFile/16/real_time 45455989 ns 3591831 ns
15 bytes_per_second=2.74991G/s
ReadaheadCompressedFile/64/real_time 64612808 ns 8730379 ns
11 bytes_per_second=1.9346G/s
ReadaheadCompressedFile/256/real_time 188120059 ns 18770867 ns
4 bytes_per_second=680.417M/s
ReadaheadCompressedFile/1024/real_time 699025221 ns 58355812 ns
1 bytes_per_second=183.112M/s
```
With only batch-level readahead:
```
ReadaheadCompressedFile/1/real_time 59513029 ns 3280963 ns
11 bytes_per_second=2.10038G/s
ReadaheadCompressedFile/4/real_time 27160986 ns 2378473 ns
25 bytes_per_second=4.60219G/s
ReadaheadCompressedFile/16/real_time 30018574 ns 3746521 ns
23 bytes_per_second=4.16409G/s
ReadaheadCompressedFile/64/real_time 41358054 ns 11515016 ns
17 bytes_per_second=3.02239G/s
ReadaheadCompressedFile/256/real_time 80396235 ns 26029905 ns
9 bytes_per_second=1.5548G/s
ReadaheadCompressedFile/1024/real_time 473883489 ns 96278350 ns
2 bytes_per_second=270.109M/s
```
With no readahead:
```
ReadaheadCompressedFile/1/real_time 544947010 ns 8763782 ns
2 bytes_per_second=234.885M/s
ReadaheadCompressedFile/4/real_time 73232625 ns 1522407 ns
8 bytes_per_second=1.70689G/s
ReadaheadCompressedFile/16/real_time 47781962 ns 2977593 ns
15 bytes_per_second=2.61605G/s
ReadaheadCompressedFile/64/real_time 64026555 ns 7882798 ns
11 bytes_per_second=1.95231G/s
ReadaheadCompressedFile/256/real_time 187098473 ns 18156513 ns
4 bytes_per_second=684.132M/s
ReadaheadCompressedFile/1024/real_time 696976648 ns 57355852 ns
1 bytes_per_second=183.65M/s
```
So I'll change this to only test batch-level readahead. I/O level readahead
would help more on something like S3, which we could set up a benchmark for as
well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]