lidavidm commented on a change in pull request #11616:
URL: https://github.com/apache/arrow/pull/11616#discussion_r766761232



##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -2715,6 +2727,133 @@ TEST(TestRecordBatchFileReaderIo, 
ReadTwoContinousFieldsWithIoMerged) {
   GetReadRecordBatchReadRanges(64, {0, 1}, {8 + 64 * 4});
 }
 
+constexpr static int kNumBatches = 10;
+// It can be difficult to know the exact size of the schema.  Instead we just 
make the
+// row data big enough that we can easily identify if a read is for a schema 
or for
+// row data.
+//
+// This needs to be large enough to space record batches kDefaultHoleSizeLimit 
bytes apart
+// and also large enough that record batch data is more than 
kMaxMetadataSizeBytes bytes
+constexpr static int kRowsPerBatch = 1000;
+constexpr static int64_t kMaxMetadataSizeBytes = 1 << 13;
+// There are always 2 reads when the file is opened
+constexpr static int kNumReadsOnOpen = 2;
+
+class PreBufferingTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override {
+    file_buffer_ = MakeBooleanInt32Int64File(kRowsPerBatch, kNumBatches);
+  }
+
+  void OpenReader() {
+    buffer_reader_ = make_unique<io::BufferReader>(file_buffer_);
+    tracked_ = std::make_shared<TrackedRandomAccessFile>(buffer_reader_.get());
+    auto read_options = IpcReadOptions::Defaults();
+    if (ReadsArePlugged()) {
+      // This will ensure that all reads get globbed together into one large 
read
+      read_options.pre_buffer_cache_options.hole_size_limit =
+          std::numeric_limits<int64_t>::max() - 1;
+      read_options.pre_buffer_cache_options.range_size_limit =
+          std::numeric_limits<int64_t>::max();
+    }
+    ASSERT_OK_AND_ASSIGN(reader_, RecordBatchFileReader::Open(tracked_, 
read_options));
+  }
+
+  bool ReadsArePlugged() { return GetParam(); }
+
+  std::vector<int> AllBatchIndices() {
+    std::vector<int> all_batch_indices(kNumBatches);
+    std::iota(all_batch_indices.begin(), all_batch_indices.end(), 0);
+    return all_batch_indices;
+  }
+
+  void AssertMetadataLoaded(std::vector<int> batch_indices) {
+    if (batch_indices.size() == 0) {
+      batch_indices = AllBatchIndices();
+    }
+    const auto& read_ranges = tracked_->get_read_ranges();
+    if (ReadsArePlugged()) {
+      // The read should have arrived as one large read
+      ASSERT_EQ(kNumReadsOnOpen + 1, read_ranges.size());
+      if (batch_indices.size() > 1) {
+        ASSERT_GT(read_ranges[kNumReadsOnOpen].length, kMaxMetadataSizeBytes);
+      }
+    } else {
+      // We should get many small reads of metadata only
+      ASSERT_EQ(batch_indices.size() + kNumReadsOnOpen, read_ranges.size());
+      for (const auto& read_range : read_ranges) {
+        ASSERT_LT(read_range.length, kMaxMetadataSizeBytes);
+      }
+    }
+  }
+
+  std::vector<std::shared_ptr<RecordBatch>> LoadExpected() {
+    auto buffer_reader = make_unique<io::BufferReader>(file_buffer_);
+    auto read_options = IpcReadOptions::Defaults();
+    EXPECT_OK_AND_ASSIGN(auto reader,
+                         RecordBatchFileReader::Open(buffer_reader.get(), 
read_options));
+    std::vector<std::shared_ptr<RecordBatch>> expected_batches;
+    for (int i = 0; i < reader->num_record_batches(); i++) {
+      EXPECT_OK_AND_ASSIGN(auto expected_batch, reader->ReadRecordBatch(i));
+      expected_batches.push_back(expected_batch);
+    }
+    return expected_batches;
+  }
+
+  void CheckFileRead(int num_indices_pre_buffered) {
+    auto expected_batches = LoadExpected();
+    const std::vector<io::ReadRange>& read_ranges = 
tracked_->get_read_ranges();
+    std::size_t starting_reads = read_ranges.size();
+    for (int i = 0; i < reader_->num_record_batches(); i++) {
+      ASSERT_OK_AND_ASSIGN(auto next_batch, reader_->ReadRecordBatch(i));
+      AssertBatchesEqual(*expected_batches[i], *next_batch);
+    }
+    int metadata_reads = 0;
+    int data_reads = 0;
+    for (std::size_t i = starting_reads; i < read_ranges.size(); i++) {
+      if (read_ranges[i].length > kMaxMetadataSizeBytes) {

Review comment:
       Hmm, the reads are not consolidated because they're too large or too far 
apart?

##########
File path: cpp/src/arrow/ipc/read_write_benchmark.cc
##########
@@ -49,9 +51,29 @@ std::shared_ptr<RecordBatch> MakeRecordBatch(int64_t 
total_size, int64_t num_fie
   return RecordBatch::Make(schema, length, arrays);
 }
 
+std::vector<int> GetIncludedFields(int64_t num_fields, int64_t 
is_partial_read) {
+  if (is_partial_read) {
+    std::vector<int> field_indices;
+    for (int i = 0; i < num_fields; i += 8) {
+      field_indices.push_back(i);
+    }
+    return field_indices;
+  } else {
+    return std::vector<int>();
+  }
+}
+
+int64_t BytesPerIteration(int64_t num_fields, int64_t is_partial_read, int64_t 
batch_size, int64_t num_batches) {
+  std::size_t num_actual_fields = GetIncludedFields(num_fields, 
is_partial_read).size();
+  double selectivity = num_actual_fields / static_cast<double>(num_fields);
+  if (num_actual_fields == 0) selectivity = 1;
+  auto bytes = batch_size * num_batches * selectivity;
+  return static_cast<int64_t>(bytes);
+}
+
 static void WriteRecordBatch(benchmark::State& state) {  // NOLINT non-const 
reference
   // 1MB
-  constexpr int64_t kTotalSize = 1 << 20;
+  constexpr int64_t kTotalSize = 1 << 23;

Review comment:
       Changing the size will effectively invalidate previous benchmark numbers 
right?

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -2715,6 +2727,133 @@ TEST(TestRecordBatchFileReaderIo, 
ReadTwoContinousFieldsWithIoMerged) {
   GetReadRecordBatchReadRanges(64, {0, 1}, {8 + 64 * 4});
 }
 
+constexpr static int kNumBatches = 10;
+// It can be difficult to know the exact size of the schema.  Instead we just 
make the
+// row data big enough that we can easily identify if a read is for a schema 
or for
+// row data.
+//
+// This needs to be large enough to space record batches kDefaultHoleSizeLimit 
bytes apart
+// and also large enough that record batch data is more than 
kMaxMetadataSizeBytes bytes
+constexpr static int kRowsPerBatch = 1000;
+constexpr static int64_t kMaxMetadataSizeBytes = 1 << 13;
+// There are always 2 reads when the file is opened
+constexpr static int kNumReadsOnOpen = 2;
+
+class PreBufferingTest : public ::testing::TestWithParam<bool> {
+ protected:
+  void SetUp() override {
+    file_buffer_ = MakeBooleanInt32Int64File(kRowsPerBatch, kNumBatches);
+  }
+
+  void OpenReader() {
+    buffer_reader_ = make_unique<io::BufferReader>(file_buffer_);
+    tracked_ = std::make_shared<TrackedRandomAccessFile>(buffer_reader_.get());
+    auto read_options = IpcReadOptions::Defaults();
+    if (ReadsArePlugged()) {
+      // This will ensure that all reads get globbed together into one large 
read
+      read_options.pre_buffer_cache_options.hole_size_limit =
+          std::numeric_limits<int64_t>::max() - 1;
+      read_options.pre_buffer_cache_options.range_size_limit =
+          std::numeric_limits<int64_t>::max();
+    }
+    ASSERT_OK_AND_ASSIGN(reader_, RecordBatchFileReader::Open(tracked_, 
read_options));
+  }
+
+  bool ReadsArePlugged() { return GetParam(); }
+
+  std::vector<int> AllBatchIndices() {
+    std::vector<int> all_batch_indices(kNumBatches);
+    std::iota(all_batch_indices.begin(), all_batch_indices.end(), 0);
+    return all_batch_indices;
+  }
+
+  void AssertMetadataLoaded(std::vector<int> batch_indices) {
+    if (batch_indices.size() == 0) {
+      batch_indices = AllBatchIndices();
+    }
+    const auto& read_ranges = tracked_->get_read_ranges();
+    if (ReadsArePlugged()) {
+      // The read should have arrived as one large read
+      ASSERT_EQ(kNumReadsOnOpen + 1, read_ranges.size());
+      if (batch_indices.size() > 1) {
+        ASSERT_GT(read_ranges[kNumReadsOnOpen].length, kMaxMetadataSizeBytes);
+      }
+    } else {
+      // We should get many small reads of metadata only
+      ASSERT_EQ(batch_indices.size() + kNumReadsOnOpen, read_ranges.size());
+      for (const auto& read_range : read_ranges) {
+        ASSERT_LT(read_range.length, kMaxMetadataSizeBytes);
+      }
+    }
+  }
+
+  std::vector<std::shared_ptr<RecordBatch>> LoadExpected() {
+    auto buffer_reader = make_unique<io::BufferReader>(file_buffer_);
+    auto read_options = IpcReadOptions::Defaults();
+    EXPECT_OK_AND_ASSIGN(auto reader,
+                         RecordBatchFileReader::Open(buffer_reader.get(), 
read_options));
+    std::vector<std::shared_ptr<RecordBatch>> expected_batches;
+    for (int i = 0; i < reader->num_record_batches(); i++) {
+      EXPECT_OK_AND_ASSIGN(auto expected_batch, reader->ReadRecordBatch(i));
+      expected_batches.push_back(expected_batch);
+    }
+    return expected_batches;
+  }
+
+  void CheckFileRead(int num_indices_pre_buffered) {
+    auto expected_batches = LoadExpected();
+    const std::vector<io::ReadRange>& read_ranges = 
tracked_->get_read_ranges();
+    std::size_t starting_reads = read_ranges.size();
+    for (int i = 0; i < reader_->num_record_batches(); i++) {
+      ASSERT_OK_AND_ASSIGN(auto next_batch, reader_->ReadRecordBatch(i));
+      AssertBatchesEqual(*expected_batches[i], *next_batch);
+    }
+    int metadata_reads = 0;
+    int data_reads = 0;
+    for (std::size_t i = starting_reads; i < read_ranges.size(); i++) {
+      if (read_ranges[i].length > kMaxMetadataSizeBytes) {

Review comment:
       Ah, I see now - metadata and data are always read separately to support 
column selection. I wonder if it'd be worth also optimizing for when no column 
selection is done.

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -147,6 +172,16 @@ class ArrayLoader {
       : metadata_(metadata),
         metadata_version_(metadata_version),
         file_(file),
+        offset_(0),

Review comment:
       Maybe `file_offset_`? My first instinct would be array offset

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -2715,6 +2727,133 @@ TEST(TestRecordBatchFileReaderIo, 
ReadTwoContinousFieldsWithIoMerged) {
   GetReadRecordBatchReadRanges(64, {0, 1}, {8 + 64 * 4});
 }
 
+constexpr static int kNumBatches = 10;
+// It can be difficult to know the exact size of the schema.  Instead we just 
make the
+// row data big enough that we can easily identify if a read is for a schema 
or for
+// row data.
+//
+// This needs to be large enough to space record batches kDefaultHoleSizeLimit 
bytes apart
+// and also large enough that record batch data is more than 
kMaxMetadataSizeBytes bytes
+constexpr static int kRowsPerBatch = 1000;
+constexpr static int64_t kMaxMetadataSizeBytes = 1 << 13;
+// There are always 2 reads when the file is opened
+constexpr static int kNumReadsOnOpen = 2;

Review comment:
       This could be filed for a later JIRA, but we could perhaps skip one read 
by speculatively reading (say) 1 MB of the footer instead of reading the footer 
size and footer separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to