lidavidm commented on a change in pull request #11486:
URL: https://github.com/apache/arrow/pull/11486#discussion_r740561405



##########
File path: cpp/src/arrow/filesystem/mockfs.cc
##########
@@ -242,8 +242,32 @@ class MockFSInputStream : public io::BufferReader {
     return metadata_;
   }
 
+  Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out) 
override {
+    RecordReadIo(position, nbytes);
+    return io::BufferReader::DoReadAt(position, nbytes, out);
+  }
+
+  Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes) 
override {
+    RecordReadIo(position, nbytes);
+    return io::BufferReader::DoReadAt(position, nbytes);
+  }

Review comment:
       Sounds good to me. The same people are on both PRs so we should be aware.

##########
File path: cpp/src/arrow/ipc/message.cc
##########
@@ -279,8 +280,37 @@ std::string FormatMessageType(MessageType type) {
   return "unknown";
 }
 
+Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
+                        io::RandomAccessFile* file,
+                        const FieldsLoaderFunction& fields_loader,
+                        const std::shared_ptr<Buffer>& metadata, int64_t 
required_size,
+                        std::shared_ptr<Buffer>& body) {
+  const flatbuf::Message* message = nullptr;
+  uint8_t continuation_metadata_size = sizeof(int32_t) + sizeof(int32_t);
+  // skip 8 bytes (32-bit continuation indicator + 32-bit little-endian length 
prefix)
+  RETURN_NOT_OK(internal::VerifyMessage(metadata->data() + 
continuation_metadata_size,
+                                        metadata->size() - 
continuation_metadata_size,
+                                        &message));
+
+  auto batch = message->header_as_RecordBatch();
+  auto io_recorded_random_access_file = 
std::unique_ptr<IoRecordedRandomAccessFile>(

Review comment:
       nit: since the pointer doesn't escape this function, we can 
stack-allocate the file instead.
   
   ```cpp
   IoRecordedRandomAccessFile file(required_size);
   RETURN_NOT_OK(fields_loader(batch, &file));
   // etc.
   ```

##########
File path: cpp/src/arrow/ipc/message.h
##########
@@ -24,6 +24,7 @@
 #include <string>
 #include <utility>
 
+#include "arrow/io/interfaces.h"

Review comment:
       We can forward-declare io::RandomAccessFile if we want to avoid 
including the entire header.

##########
File path: cpp/src/arrow/ipc/message.cc
##########
@@ -279,8 +280,37 @@ std::string FormatMessageType(MessageType type) {
   return "unknown";
 }
 
+Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
+                        io::RandomAccessFile* file,
+                        const FieldsLoaderFunction& fields_loader,
+                        const std::shared_ptr<Buffer>& metadata, int64_t 
required_size,
+                        std::shared_ptr<Buffer>& body) {
+  const flatbuf::Message* message = nullptr;
+  uint8_t continuation_metadata_size = sizeof(int32_t) + sizeof(int32_t);
+  // skip 8 bytes (32-bit continuation indicator + 32-bit little-endian length 
prefix)
+  RETURN_NOT_OK(internal::VerifyMessage(metadata->data() + 
continuation_metadata_size,
+                                        metadata->size() - 
continuation_metadata_size,
+                                        &message));
+
+  auto batch = message->header_as_RecordBatch();

Review comment:
       Should we return an error if the message isn't a record batch?

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1706,6 +1706,54 @@ TEST_F(TestFileFormat, ReadFieldSubset) { 
TestReadSubsetOfFields(); }
 
 TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
 
+class TrackedRandomAccessFile : public io::RandomAccessFile {
+ public:
+  explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate)
+      : delegate_(delegate) {}
+
+  Status Close() override { return delegate_->Close(); }
+  bool closed() const override { return delegate_->closed(); }
+  Result<int64_t> Tell() const override { return delegate_->Tell(); }
+  Status Seek(int64_t position) override { return delegate_->Seek(position); }
+  Result<int64_t> Read(int64_t nbytes, void* out) override {
+    ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+    save_read_range(position, nbytes);
+    return delegate_->Read(nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+    save_read_range(position, nbytes);
+    return delegate_->Read(nbytes);
+  }
+  bool supports_zero_copy() const override { return 
delegate_->supports_zero_copy(); }
+  Result<int64_t> GetSize() override { return delegate_->GetSize(); }
+  Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override 
{
+    save_read_range(position, nbytes);
+    return delegate_->ReadAt(position, nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) 
override {
+    save_read_range(position, nbytes);
+    return delegate_->ReadAt(position, nbytes);
+  }
+  Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
+                                            int64_t position, int64_t nbytes) 
override {
+    save_read_range(position, nbytes);
+    return delegate_->ReadAsync(io_context, position, nbytes);
+  }
+
+  int64_t num_reads() const { return read_ranges_.size(); }
+
+  const std::vector<io::ReadRange>& get_read_ranges() const { return 
read_ranges_; }
+
+ private:
+  io::RandomAccessFile* delegate_;
+  std::vector<io::ReadRange> read_ranges_;
+
+  void save_read_range(int64_t offset, int64_t length) {

Review comment:
       nit, but non-getter methods use CamelCase naming

##########
File path: cpp/src/arrow/ipc/reader.h
##########
@@ -212,6 +212,53 @@ class ARROW_EXPORT RecordBatchFileReader
       arrow::internal::Executor* executor = NULLPTR) = 0;
 };
 
+/// \class IoRecordedRandomAccessFile
+/// \brief An RandomAccessFile that doesn't perform real IO, but only save all 
the IO
+/// operations it receives, including read operation's <offset, length>, for 
replaying
+/// later
+class ARROW_EXPORT IoRecordedRandomAccessFile : public io::RandomAccessFile {

Review comment:
       nit: could this be moved to a `reader_internal.h` or at least be put in 
a `namespace internal` (e.g. at the bottom) to make it clear that it's not 
really part of the public API?
   
   (`*_internal.h` headers don't get installed so they're ideal for this 
purpose)

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1061,6 +1062,31 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
     return internal::GetMetadataVersion(footer_->version());
   }
 
+  static Status LoadFieldsSubset(const flatbuf::RecordBatch* metadata,
+                                 const IpcReadOptions& options,
+                                 io::RandomAccessFile* file,
+                                 const std::shared_ptr<Schema>& schema,
+                                 const std::vector<bool>* inclusion_mask,
+                                 MetadataVersion metadata_version = 
MetadataVersion::V5) {
+    ArrayLoader loader(metadata, metadata_version, options, file);
+    for (int i = 0; i < schema->num_fields(); ++i) {
+      const Field& field = *schema->field(i);
+      if (!inclusion_mask || (*inclusion_mask)[i]) {
+        // Read field
+        auto column = std::make_shared<ArrayData>();
+        RETURN_NOT_OK(loader.Load(&field, column.get()));

Review comment:
       This column is also only used to get the side effect of Load? In that 
case this ArrayData can be stack-allocated too I think.

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1706,6 +1706,54 @@ TEST_F(TestFileFormat, ReadFieldSubset) { 
TestReadSubsetOfFields(); }
 
 TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
 
+class TrackedRandomAccessFile : public io::RandomAccessFile {
+ public:
+  explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate)
+      : delegate_(delegate) {}
+
+  Status Close() override { return delegate_->Close(); }
+  bool closed() const override { return delegate_->closed(); }
+  Result<int64_t> Tell() const override { return delegate_->Tell(); }
+  Status Seek(int64_t position) override { return delegate_->Seek(position); }
+  Result<int64_t> Read(int64_t nbytes, void* out) override {
+    ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+    save_read_range(position, nbytes);
+    return delegate_->Read(nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+    save_read_range(position, nbytes);
+    return delegate_->Read(nbytes);
+  }
+  bool supports_zero_copy() const override { return 
delegate_->supports_zero_copy(); }
+  Result<int64_t> GetSize() override { return delegate_->GetSize(); }
+  Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override 
{
+    save_read_range(position, nbytes);
+    return delegate_->ReadAt(position, nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) 
override {
+    save_read_range(position, nbytes);
+    return delegate_->ReadAt(position, nbytes);
+  }
+  Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
+                                            int64_t position, int64_t nbytes) 
override {
+    save_read_range(position, nbytes);
+    return delegate_->ReadAsync(io_context, position, nbytes);
+  }
+
+  int64_t num_reads() const { return read_ranges_.size(); }
+
+  const std::vector<io::ReadRange>& get_read_ranges() const { return 
read_ranges_; }
+
+ private:
+  io::RandomAccessFile* delegate_;
+  std::vector<io::ReadRange> read_ranges_;
+
+  void save_read_range(int64_t offset, int64_t length) {

Review comment:
       Sorry, I should've linked it: 
https://arrow.apache.org/docs/developers/cpp/development.html#code-style-linting-and-ci

##########
File path: cpp/src/arrow/ipc/message.h
##########
@@ -441,6 +441,10 @@ class ARROW_EXPORT MessageReader {
   virtual Result<std::unique_ptr<Message>> ReadNextMessage() = 0;
 };
 
+// the first parameter of the function should be a pointer to metadata (aka.
+// org::apache::arrow::flatbuf::RecordBatch*)
+using FieldsLoaderFunction = std::function<Status(const void*, 
io::RandomAccessFile*)>;

Review comment:
       from CI, it seems we need `#include <functional>` in this header now.

##########
File path: cpp/src/arrow/ipc/reader_internal.h
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/io/type_fwd.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow {
+namespace ipc {
+
+class ReadRange;

Review comment:
       If this is a forward declaration I don't think it's in the right 
namespace.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to