niyue commented on a change in pull request #11486:
URL: https://github.com/apache/arrow/pull/11486#discussion_r740213625



##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -2410,6 +2469,160 @@ TEST(DictionaryMemo, AddDictionaryType) {
   AssertMemoDictionaryType(memo, 44, utf8());
 }
 
+TEST(IoRecordedRandomAccessFile, IoRecording) {
+  IoRecordedRandomAccessFile file(42);
+  ASSERT_TRUE(file.GetRecordedReads().empty());
+
+  ASSERT_OK(file.ReadAt(1, 2));
+  ASSERT_EQ(file.GetRecordedReads().size(), 1);
+  ASSERT_EQ(file.GetRecordedReads()[0], (io::ReadRange{1, 2}));
+
+  ASSERT_OK(file.ReadAt(5, 3));
+  ASSERT_EQ(file.GetRecordedReads().size(), 2);
+  ASSERT_EQ(file.GetRecordedReads()[1], (io::ReadRange{5, 3}));
+
+  // continuous IOs will be merged
+  ASSERT_OK(file.ReadAt(5 + 3, 6));
+  ASSERT_EQ(file.GetRecordedReads().size(), 2);
+  ASSERT_EQ(file.GetRecordedReads()[1], (io::ReadRange{5, 3 + 6}));
+
+  // this should not happen but reading out of bounds will do no harm
+  ASSERT_OK(file.ReadAt(43, 1));
+}
+
+TEST(IoRecordedRandomAccessFile, IoRecordingWithOutput) {
+  std::shared_ptr<Buffer> out;
+  IoRecordedRandomAccessFile file(42);
+  ASSERT_TRUE(file.GetRecordedReads().empty());
+  ASSERT_EQ(file.ReadAt(1, 2, &out), 2L);
+  ASSERT_EQ(file.GetRecordedReads().size(), 1);
+  ASSERT_EQ(file.GetRecordedReads()[0], (io::ReadRange{1, 2}));
+
+  ASSERT_EQ(file.ReadAt(5, 1, &out), 1);
+  ASSERT_EQ(file.GetRecordedReads().size(), 2);
+  ASSERT_EQ(file.GetRecordedReads()[1], (io::ReadRange{5, 1}));
+
+  // continuous IOs will be merged
+  ASSERT_EQ(file.ReadAt(5 + 1, 6, &out), 6);
+  ASSERT_EQ(file.GetRecordedReads().size(), 2);
+  ASSERT_EQ(file.GetRecordedReads()[1], (io::ReadRange{5, 1 + 6}));
+}
+
+Status MakeBooleanInt32Int64Batch(const int length, 
std::shared_ptr<RecordBatch>* out) {
+  // Make the schema
+  auto f0 = field("f0", boolean());
+  auto f1 = field("f1", int32());
+  auto f2 = field("f2", int64());
+  auto schema = ::arrow::schema({f0, f1, f2});
+
+  std::shared_ptr<Array> a0, a1, a2;
+  RETURN_NOT_OK(MakeRandomBooleanArray(length, false, &a0));
+  RETURN_NOT_OK(MakeRandomInt32Array(length, false, 
arrow::default_memory_pool(), &a1));
+  RETURN_NOT_OK(MakeRandomInt64Array(length, false, 
arrow::default_memory_pool(), &a2));
+  *out = RecordBatch::Make(schema, length, {a0, a1, a2});
+  return Status::OK();
+}
+
+void GetReadRecordBatchIoStats(uint32_t num_rows, const std::vector<int>& 
included_fields,
+                               const std::vector<int64_t>& 
expected_body_read_lengths) {
+  std::shared_ptr<RecordBatch> batch;
+  // [bool, int32, int64] batch
+  ASSERT_OK(MakeBooleanInt32Int64Batch(num_rows, &batch));
+
+  MockFileWriterHelper helper("testdir/rb_file_reader_io.arrow");
+  ASSERT_OK(helper.Init(batch->schema(), IpcWriteOptions::Defaults()));
+  ASSERT_OK(helper.WriteBatch(batch));
+  ASSERT_OK(helper.Finish());
+
+  ReadStats read_stats;
+  RecordBatchVector out_batches;
+  auto read_options = IpcReadOptions::Defaults();
+  // if empty, return all fields
+  read_options.included_fields = included_fields;
+  std::vector<io::ReadRange> io_stats;
+  ASSERT_OK(helper.ReadBatches(read_options, &out_batches, &read_stats, 
&io_stats));
+  ASSERT_EQ(out_batches.size(), 1);
+  ASSERT_EQ(out_batches[0]->num_rows(), num_rows);
+  ASSERT_EQ(out_batches[0]->num_columns(),
+            included_fields.empty() ? 3 : included_fields.size());
+
+  // there are 3 read IOs before reading body:
+  // 1) read magic and footer length IO
+  // 2) read footer IO
+  // 3) read record batch metadata IO
+  ASSERT_EQ(io_stats.size(), 3 + expected_body_read_lengths.size());
+  const int32_t magic_size = 
static_cast<int>(strlen(ipc::internal::kArrowMagicBytes));
+  // read magic and footer length IO
+  auto file_end_size = magic_size + sizeof(int32_t);
+  ASSERT_EQ(io_stats[0].length, file_end_size);
+  // read footer IO
+  ASSERT_EQ(io_stats[1].length, 256);
+  // read record batch metadata
+  ASSERT_EQ(io_stats[2].length, 240);
+  for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
+    ASSERT_EQ(io_stats[3 + i].length, expected_body_read_lengths[i]);
+  }
+}
+
+void GetReadRecordBatchIoStats(const std::vector<int>& included_fields,
+                               const std::vector<int64_t>& 
expected_body_read_lengths) {
+  return GetReadRecordBatchIoStats(5, included_fields, 
expected_body_read_lengths);
+}
+
+TEST(TestRecordBatchFileReaderIo, LoadAllFieldsShouldReadTheEntireBody) {
+  // read the entire record batch body in single read
+  // the batch has 5 * bool + 5 * int32 + 5 * int32
+  // ==>
+  // + 5 bool:  5 bits      (aligned to  8 bytes)
+  // + 5 int32: 5 * 4 bytes (aligned to 24 bytes)
+  // + 5 int64: 5 * 8 bytes (aligned to 40 bytes)
+  GetReadRecordBatchIoStats({}, {8 + 24 + 40});
+}
+
+TEST(TestRecordBatchFileReaderIo, ReadSingleFieldAtTheStart) {
+  // read only the bool field
+  // + 5 bool:  5 bits (1 byte)
+  GetReadRecordBatchIoStats({0}, {1});
+}
+
+TEST(TestRecordBatchFileReaderIo, ReadSingleFieldInTheMiddle) {
+  // read only the int32 field
+  // + 5 int32: 5 * 4 bytes
+  GetReadRecordBatchIoStats({1}, {20});
+}
+
+TEST(TestRecordBatchFileReaderIo, ReadSingleFieldInTheEnd) {
+  // read only the int64 field
+  // + 5 int64: 5 * 8 bytes
+  GetReadRecordBatchIoStats({2}, {40});
+}
+
+TEST(TestRecordBatchFileReaderIo, SkipTheFieldInTheMiddle) {
+  // read the bool field and the int64 field
+  // two IOs for body are expected, first for reading bool and the second for 
reading
+  // int64
+  // + 5 bool:  5 bits (1 byte)
+  // + 5 int64: 5 * 8 bytes
+  GetReadRecordBatchIoStats({0, 2}, {1, 40});
+}
+
+TEST(TestRecordBatchFileReaderIo, ReadTwoContinousFields) {
+  // read the int32 field and the int64 field
+  // + 5 int32: 5 * 4 bytes
+  // + 5 int64: 5 * 8 bytes
+  GetReadRecordBatchIoStats({1, 2}, {20, 40});
+}
+
+TEST(TestRecordBatchFileReaderIo, ReadTwoContinousFieldsWithIoMerged) {
+  // change the array length to 64 so that bool field and int32 are continuous 
without
+  // padding
+  // read the bool field and the int32 field since the bool field's aligned 
offset
+  // is continuous with next field (int32 field), two IOs are merged into one
+  // + 64 bool: 64 bits (8 bytes)
+  // + 64 int32: 64 * 4 bytes (256 bytes)
+  GetReadRecordBatchIoStats(64, {0, 1}, {8 + 64 * 4});
+}

Review comment:
       Several RecordBatchFileReader IO related tests are added above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to