niyue commented on a change in pull request #11486:
URL: https://github.com/apache/arrow/pull/11486#discussion_r740561031



##########
File path: cpp/src/arrow/filesystem/mockfs.cc
##########
@@ -242,8 +242,32 @@ class MockFSInputStream : public io::BufferReader {
     return metadata_;
   }
 
+  Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out) 
override {
+    RecordReadIo(position, nbytes);
+    return io::BufferReader::DoReadAt(position, nbytes, out);
+  }
+
+  Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes) 
override {
+    RecordReadIo(position, nbytes);
+    return io::BufferReader::DoReadAt(position, nbytes);
+  }

Review comment:
       > While it's not merged yet, perhaps an approach like the one here will 
be easier? I don't think we need a full mock FileSystem, either - especially 
since the filesystem module isn't necessarily enabled. 
https://github.com/apache/arrow/pull/11535/files#diff-900c46995b5706697d6e4b010f610f1a1cf27d4d865afe48de0a800830ac676bL1708
   
   Got it. It seems the simplest way is to modify this 
`TrackedRandomAccessFile` to add not only the number of read IO times but also 
the offset/length, but this PR is not merged yet, do you think if I can copy 
the `TrackedRandomAccessFile` into my PR and add the necessary change, and we 
can do some merge later?

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1706,6 +1707,54 @@ TEST_F(TestFileFormat, ReadFieldSubset) { 
TestReadSubsetOfFields(); }
 
 TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
 
+class TrackedRandomAccessFile : public io::RandomAccessFile {
+ public:
+  explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate)
+      : delegate_(delegate) {}
+
+  Status Close() override { return delegate_->Close(); }
+  bool closed() const override { return delegate_->closed(); }
+  Result<int64_t> Tell() const override { return delegate_->Tell(); }
+  Status Seek(int64_t position) override { return delegate_->Seek(position); }
+  Result<int64_t> Read(int64_t nbytes, void* out) override {
+    ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+    save_read_range(position, nbytes);
+    return delegate_->Read(nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+    save_read_range(position, nbytes);
+    return delegate_->Read(nbytes);
+  }
+  bool supports_zero_copy() const override { return 
delegate_->supports_zero_copy(); }
+  Result<int64_t> GetSize() override { return delegate_->GetSize(); }
+  Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override 
{
+    save_read_range(position, nbytes);
+    return delegate_->ReadAt(position, nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) 
override {
+    save_read_range(position, nbytes);
+    return delegate_->ReadAt(position, nbytes);
+  }
+  Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
+                                            int64_t position, int64_t nbytes) 
override {
+    save_read_range(position, nbytes);
+    return delegate_->ReadAsync(io_context, position, nbytes);
+  }
+
+  int64_t num_reads() const { return read_ranges_.size(); }
+
+  const std::vector<io::ReadRange>& get_read_ranges() const { return 
read_ranges_; }
+
+ private:
+  io::RandomAccessFile* delegate_;
+  std::vector<io::ReadRange> read_ranges_;

Review comment:
       @lidavidm I copy the `TrackedRandomAccessFile` into this PR, and 
tracking the read ranges using a vector, since I think the `num_reads` is just 
the length of this vector, I remove the `read_` member variable in 
https://github.com/apache/arrow/pull/11535/files#diff-900c46995b5706697d6e4b010f610f1a1cf27d4d865afe48de0a800830ac676bL1708

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1358,6 +1398,62 @@ Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
       .Then([=]() -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
 }
 
+Result<int64_t> IoRecordedRandomAccessFile::GetSize() { return file_size_; }
+
+Result<int64_t> IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t 
nbytes,
+                                                   void* out) {
+  auto num_bytes_read = std::min(file_size_, position + nbytes) - position;
+
+  if (!recorded_reads_.empty() &&
+      position == recorded_reads_.back().offset + 
recorded_reads_.back().length) {
+    // merge continuous IOs into one if possible
+    recorded_reads_.back().length += num_bytes_read;
+  } else {
+    // no real IO is performed, it is only saved into a vector for replaying 
later
+    recorded_reads_.emplace_back(io::ReadRange{position, num_bytes_read});
+  }
+  return num_bytes_read;
+}

Review comment:
       OK. I am not aware of this. I will fix it.

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1358,6 +1398,62 @@ Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
       .Then([=]() -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
 }
 
+Result<int64_t> IoRecordedRandomAccessFile::GetSize() { return file_size_; }
+
+Result<int64_t> IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t 
nbytes,
+                                                   void* out) {
+  auto num_bytes_read = std::min(file_size_, position + nbytes) - position;
+
+  if (!recorded_reads_.empty() &&
+      position == recorded_reads_.back().offset + 
recorded_reads_.back().length) {
+    // merge continuous IOs into one if possible
+    recorded_reads_.back().length += num_bytes_read;
+  } else {
+    // no real IO is performed, it is only saved into a vector for replaying 
later
+    recorded_reads_.emplace_back(io::ReadRange{position, num_bytes_read});
+  }
+  return num_bytes_read;
+}

Review comment:
       OK. I was not aware of this. I will fix it.

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1358,6 +1398,62 @@ Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
       .Then([=]() -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
 }
 
+Result<int64_t> IoRecordedRandomAccessFile::GetSize() { return file_size_; }
+
+Result<int64_t> IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t 
nbytes,
+                                                   void* out) {
+  auto num_bytes_read = std::min(file_size_, position + nbytes) - position;
+
+  if (!recorded_reads_.empty() &&
+      position == recorded_reads_.back().offset + 
recorded_reads_.back().length) {
+    // merge continuous IOs into one if possible
+    recorded_reads_.back().length += num_bytes_read;
+  } else {
+    // no real IO is performed, it is only saved into a vector for replaying 
later
+    recorded_reads_.emplace_back(io::ReadRange{position, num_bytes_read});
+  }
+  return num_bytes_read;
+}

Review comment:
       @westonpace I fixed the position advancing issue and add one more test 
case to cover it. 
   
   And I pushed a new commit and hope it addresses the lint issue as well 
(thanks to @kou, now I verified the linting using the Docker image locally)

##########
File path: cpp/src/arrow/ipc/message.cc
##########
@@ -279,8 +280,37 @@ std::string FormatMessageType(MessageType type) {
   return "unknown";
 }
 
+Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
+                        io::RandomAccessFile* file,
+                        const FieldsLoaderFunction& fields_loader,
+                        const std::shared_ptr<Buffer>& metadata, int64_t 
required_size,
+                        std::shared_ptr<Buffer>& body) {
+  const flatbuf::Message* message = nullptr;
+  uint8_t continuation_metadata_size = sizeof(int32_t) + sizeof(int32_t);
+  // skip 8 bytes (32-bit continuation indicator + 32-bit little-endian length 
prefix)
+  RETURN_NOT_OK(internal::VerifyMessage(metadata->data() + 
continuation_metadata_size,
+                                        metadata->size() - 
continuation_metadata_size,
+                                        &message));
+
+  auto batch = message->header_as_RecordBatch();
+  auto io_recorded_random_access_file = 
std::unique_ptr<IoRecordedRandomAccessFile>(

Review comment:
       Fixed. I probably copied this from other usage of `RandomAccessFile` but 
apparently here it is not necessary.

##########
File path: cpp/src/arrow/ipc/reader.h
##########
@@ -212,6 +212,53 @@ class ARROW_EXPORT RecordBatchFileReader
       arrow::internal::Executor* executor = NULLPTR) = 0;
 };
 
+/// \class IoRecordedRandomAccessFile
+/// \brief An RandomAccessFile that doesn't perform real IO, but only save all 
the IO
+/// operations it receives, including read operation's <offset, length>, for 
replaying
+/// later
+class ARROW_EXPORT IoRecordedRandomAccessFile : public io::RandomAccessFile {

Review comment:
       Fixed. Now I added a new header file called `reader_internal.h` and put 
this class into it, and its implementation is still in `reader.cc`. I find 
there are other internal classes in `reader.h` but to keep the change minimum 
so far I don't move them into `read_internal.h` in this PR. Let me know if 
other changes are needed.

##########
File path: cpp/src/arrow/ipc/message.cc
##########
@@ -279,8 +280,37 @@ std::string FormatMessageType(MessageType type) {
   return "unknown";
 }
 
+Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
+                        io::RandomAccessFile* file,
+                        const FieldsLoaderFunction& fields_loader,
+                        const std::shared_ptr<Buffer>& metadata, int64_t 
required_size,
+                        std::shared_ptr<Buffer>& body) {
+  const flatbuf::Message* message = nullptr;
+  uint8_t continuation_metadata_size = sizeof(int32_t) + sizeof(int32_t);
+  // skip 8 bytes (32-bit continuation indicator + 32-bit little-endian length 
prefix)
+  RETURN_NOT_OK(internal::VerifyMessage(metadata->data() + 
continuation_metadata_size,
+                                        metadata->size() - 
continuation_metadata_size,
+                                        &message));
+
+  auto batch = message->header_as_RecordBatch();

Review comment:
       You are right. Now I added an error checking like we did in `reader.cc`

##########
File path: cpp/src/arrow/ipc/message.h
##########
@@ -24,6 +24,7 @@
 #include <string>
 #include <utility>
 
+#include "arrow/io/interfaces.h"

Review comment:
       I remember it is `io:ReadRange` that makes me add this header. I've 
forward-declare it in the newly added `reader_internal.h` now.

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1061,6 +1062,31 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
     return internal::GetMetadataVersion(footer_->version());
   }
 
+  static Status LoadFieldsSubset(const flatbuf::RecordBatch* metadata,
+                                 const IpcReadOptions& options,
+                                 io::RandomAccessFile* file,
+                                 const std::shared_ptr<Schema>& schema,
+                                 const std::vector<bool>* inclusion_mask,
+                                 MetadataVersion metadata_version = 
MetadataVersion::V5) {
+    ArrayLoader loader(metadata, metadata_version, options, file);
+    for (int i = 0; i < schema->num_fields(); ++i) {
+      const Field& field = *schema->field(i);
+      if (!inclusion_mask || (*inclusion_mask)[i]) {
+        // Read field
+        auto column = std::make_shared<ArrayData>();
+        RETURN_NOT_OK(loader.Load(&field, column.get()));

Review comment:
       Good catch. I copied this code snippet from `ArrayLoader`'s usage, and 
in this case, it is not needed too be a `shared_ptr`. Fixed in the latest 
commit.

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1706,6 +1706,54 @@ TEST_F(TestFileFormat, ReadFieldSubset) { 
TestReadSubsetOfFields(); }
 
 TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
 
+class TrackedRandomAccessFile : public io::RandomAccessFile {
+ public:
+  explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate)
+      : delegate_(delegate) {}
+
+  Status Close() override { return delegate_->Close(); }
+  bool closed() const override { return delegate_->closed(); }
+  Result<int64_t> Tell() const override { return delegate_->Tell(); }
+  Status Seek(int64_t position) override { return delegate_->Seek(position); }
+  Result<int64_t> Read(int64_t nbytes, void* out) override {
+    ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+    save_read_range(position, nbytes);
+    return delegate_->Read(nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+    save_read_range(position, nbytes);
+    return delegate_->Read(nbytes);
+  }
+  bool supports_zero_copy() const override { return 
delegate_->supports_zero_copy(); }
+  Result<int64_t> GetSize() override { return delegate_->GetSize(); }
+  Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override 
{
+    save_read_range(position, nbytes);
+    return delegate_->ReadAt(position, nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) 
override {
+    save_read_range(position, nbytes);
+    return delegate_->ReadAt(position, nbytes);
+  }
+  Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
+                                            int64_t position, int64_t nbytes) 
override {
+    save_read_range(position, nbytes);
+    return delegate_->ReadAsync(io_context, position, nbytes);
+  }
+
+  int64_t num_reads() const { return read_ranges_.size(); }
+
+  const std::vector<io::ReadRange>& get_read_ranges() const { return 
read_ranges_; }
+
+ private:
+  io::RandomAccessFile* delegate_;
+  std::vector<io::ReadRange> read_ranges_;
+
+  void save_read_range(int64_t offset, int64_t length) {

Review comment:
       Changed to `CamelCase` now. 
   
   BTW, where can I find the doc about method/variable naming convention? I 
find methods/variables named in different conventions and I am not sure what to 
follow actually.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to