niyue commented on a change in pull request #11486:
URL: https://github.com/apache/arrow/pull/11486#discussion_r740561031
##########
File path: cpp/src/arrow/filesystem/mockfs.cc
##########
@@ -242,8 +242,32 @@ class MockFSInputStream : public io::BufferReader {
return metadata_;
}
+ Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out)
override {
+ RecordReadIo(position, nbytes);
+ return io::BufferReader::DoReadAt(position, nbytes, out);
+ }
+
+ Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes)
override {
+ RecordReadIo(position, nbytes);
+ return io::BufferReader::DoReadAt(position, nbytes);
+ }
Review comment:
> While it's not merged yet, perhaps an approach like the one here will
be easier? I don't think we need a full mock FileSystem, either - especially
since the filesystem module isn't necessarily enabled.
https://github.com/apache/arrow/pull/11535/files#diff-900c46995b5706697d6e4b010f610f1a1cf27d4d865afe48de0a800830ac676bL1708
Got it. It seems the simplest way is to modify this
`TrackedRandomAccessFile` to add not only the number of read IO times but also
the offset/length, but this PR is not merged yet, do you think if I can copy
the `TrackedRandomAccessFile` into my PR and add the necessary change, and we
can do some merge later?
##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1706,6 +1707,54 @@ TEST_F(TestFileFormat, ReadFieldSubset) {
TestReadSubsetOfFields(); }
TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
+class TrackedRandomAccessFile : public io::RandomAccessFile {
+ public:
+ explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate)
+ : delegate_(delegate) {}
+
+ Status Close() override { return delegate_->Close(); }
+ bool closed() const override { return delegate_->closed(); }
+ Result<int64_t> Tell() const override { return delegate_->Tell(); }
+ Status Seek(int64_t position) override { return delegate_->Seek(position); }
+ Result<int64_t> Read(int64_t nbytes, void* out) override {
+ ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+ save_read_range(position, nbytes);
+ return delegate_->Read(nbytes, out);
+ }
+ Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+ ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+ save_read_range(position, nbytes);
+ return delegate_->Read(nbytes);
+ }
+ bool supports_zero_copy() const override { return
delegate_->supports_zero_copy(); }
+ Result<int64_t> GetSize() override { return delegate_->GetSize(); }
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override
{
+ save_read_range(position, nbytes);
+ return delegate_->ReadAt(position, nbytes, out);
+ }
+ Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
override {
+ save_read_range(position, nbytes);
+ return delegate_->ReadAt(position, nbytes);
+ }
+ Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
+ int64_t position, int64_t nbytes)
override {
+ save_read_range(position, nbytes);
+ return delegate_->ReadAsync(io_context, position, nbytes);
+ }
+
+ int64_t num_reads() const { return read_ranges_.size(); }
+
+ const std::vector<io::ReadRange>& get_read_ranges() const { return
read_ranges_; }
+
+ private:
+ io::RandomAccessFile* delegate_;
+ std::vector<io::ReadRange> read_ranges_;
Review comment:
@lidavidm I copy the `TrackedRandomAccessFile` into this PR, and
tracking the read ranges using a vector, since I think the `num_reads` is just
the length of this vector, I remove the `read_` member variable in
https://github.com/apache/arrow/pull/11535/files#diff-900c46995b5706697d6e4b010f610f1a1cf27d4d865afe48de0a800830ac676bL1708
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1358,6 +1398,62 @@ Future<std::shared_ptr<RecordBatchFileReader>>
RecordBatchFileReader::OpenAsync(
.Then([=]() -> Result<std::shared_ptr<RecordBatchFileReader>> { return
result; });
}
+Result<int64_t> IoRecordedRandomAccessFile::GetSize() { return file_size_; }
+
+Result<int64_t> IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t
nbytes,
+ void* out) {
+ auto num_bytes_read = std::min(file_size_, position + nbytes) - position;
+
+ if (!recorded_reads_.empty() &&
+ position == recorded_reads_.back().offset +
recorded_reads_.back().length) {
+ // merge continuous IOs into one if possible
+ recorded_reads_.back().length += num_bytes_read;
+ } else {
+ // no real IO is performed, it is only saved into a vector for replaying
later
+ recorded_reads_.emplace_back(io::ReadRange{position, num_bytes_read});
+ }
+ return num_bytes_read;
+}
Review comment:
OK. I am not aware of this. I will fix it.
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1358,6 +1398,62 @@ Future<std::shared_ptr<RecordBatchFileReader>>
RecordBatchFileReader::OpenAsync(
.Then([=]() -> Result<std::shared_ptr<RecordBatchFileReader>> { return
result; });
}
+Result<int64_t> IoRecordedRandomAccessFile::GetSize() { return file_size_; }
+
+Result<int64_t> IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t
nbytes,
+ void* out) {
+ auto num_bytes_read = std::min(file_size_, position + nbytes) - position;
+
+ if (!recorded_reads_.empty() &&
+ position == recorded_reads_.back().offset +
recorded_reads_.back().length) {
+ // merge continuous IOs into one if possible
+ recorded_reads_.back().length += num_bytes_read;
+ } else {
+ // no real IO is performed, it is only saved into a vector for replaying
later
+ recorded_reads_.emplace_back(io::ReadRange{position, num_bytes_read});
+ }
+ return num_bytes_read;
+}
Review comment:
OK. I was not aware of this. I will fix it.
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1358,6 +1398,62 @@ Future<std::shared_ptr<RecordBatchFileReader>>
RecordBatchFileReader::OpenAsync(
.Then([=]() -> Result<std::shared_ptr<RecordBatchFileReader>> { return
result; });
}
+Result<int64_t> IoRecordedRandomAccessFile::GetSize() { return file_size_; }
+
+Result<int64_t> IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t
nbytes,
+ void* out) {
+ auto num_bytes_read = std::min(file_size_, position + nbytes) - position;
+
+ if (!recorded_reads_.empty() &&
+ position == recorded_reads_.back().offset +
recorded_reads_.back().length) {
+ // merge continuous IOs into one if possible
+ recorded_reads_.back().length += num_bytes_read;
+ } else {
+ // no real IO is performed, it is only saved into a vector for replaying
later
+ recorded_reads_.emplace_back(io::ReadRange{position, num_bytes_read});
+ }
+ return num_bytes_read;
+}
Review comment:
@westonpace I fixed the position advancing issue and add one more test
case to cover it.
And I pushed a new commit and hope it addresses the lint issue as well
(thanks to @kou, now I verified the linting using the Docker image locally)
##########
File path: cpp/src/arrow/ipc/message.cc
##########
@@ -279,8 +280,37 @@ std::string FormatMessageType(MessageType type) {
return "unknown";
}
+Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
+ io::RandomAccessFile* file,
+ const FieldsLoaderFunction& fields_loader,
+ const std::shared_ptr<Buffer>& metadata, int64_t
required_size,
+ std::shared_ptr<Buffer>& body) {
+ const flatbuf::Message* message = nullptr;
+ uint8_t continuation_metadata_size = sizeof(int32_t) + sizeof(int32_t);
+ // skip 8 bytes (32-bit continuation indicator + 32-bit little-endian length
prefix)
+ RETURN_NOT_OK(internal::VerifyMessage(metadata->data() +
continuation_metadata_size,
+ metadata->size() -
continuation_metadata_size,
+ &message));
+
+ auto batch = message->header_as_RecordBatch();
+ auto io_recorded_random_access_file =
std::unique_ptr<IoRecordedRandomAccessFile>(
Review comment:
Fixed. I probably copied this from other usage of `RandomAccessFile` but
apparently here it is not necessary.
##########
File path: cpp/src/arrow/ipc/reader.h
##########
@@ -212,6 +212,53 @@ class ARROW_EXPORT RecordBatchFileReader
arrow::internal::Executor* executor = NULLPTR) = 0;
};
+/// \class IoRecordedRandomAccessFile
+/// \brief An RandomAccessFile that doesn't perform real IO, but only save all
the IO
+/// operations it receives, including read operation's <offset, length>, for
replaying
+/// later
+class ARROW_EXPORT IoRecordedRandomAccessFile : public io::RandomAccessFile {
Review comment:
Fixed. Now I added a new header file called `reader_internal.h` and put
this class into it, and its implementation is still in `reader.cc`. I find
there are other internal classes in `reader.h` but to keep the change minimum
so far I don't move them into `read_internal.h` in this PR. Let me know if
other changes are needed.
##########
File path: cpp/src/arrow/ipc/message.cc
##########
@@ -279,8 +280,37 @@ std::string FormatMessageType(MessageType type) {
return "unknown";
}
+Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
+ io::RandomAccessFile* file,
+ const FieldsLoaderFunction& fields_loader,
+ const std::shared_ptr<Buffer>& metadata, int64_t
required_size,
+ std::shared_ptr<Buffer>& body) {
+ const flatbuf::Message* message = nullptr;
+ uint8_t continuation_metadata_size = sizeof(int32_t) + sizeof(int32_t);
+ // skip 8 bytes (32-bit continuation indicator + 32-bit little-endian length
prefix)
+ RETURN_NOT_OK(internal::VerifyMessage(metadata->data() +
continuation_metadata_size,
+ metadata->size() -
continuation_metadata_size,
+ &message));
+
+ auto batch = message->header_as_RecordBatch();
Review comment:
You are right. Now I added an error checking like we did in `reader.cc`
##########
File path: cpp/src/arrow/ipc/message.h
##########
@@ -24,6 +24,7 @@
#include <string>
#include <utility>
+#include "arrow/io/interfaces.h"
Review comment:
I remember it is `io:ReadRange` that makes me add this header. I've
forward-declare it in the newly added `reader_internal.h` now.
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1061,6 +1062,31 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
return internal::GetMetadataVersion(footer_->version());
}
+ static Status LoadFieldsSubset(const flatbuf::RecordBatch* metadata,
+ const IpcReadOptions& options,
+ io::RandomAccessFile* file,
+ const std::shared_ptr<Schema>& schema,
+ const std::vector<bool>* inclusion_mask,
+ MetadataVersion metadata_version =
MetadataVersion::V5) {
+ ArrayLoader loader(metadata, metadata_version, options, file);
+ for (int i = 0; i < schema->num_fields(); ++i) {
+ const Field& field = *schema->field(i);
+ if (!inclusion_mask || (*inclusion_mask)[i]) {
+ // Read field
+ auto column = std::make_shared<ArrayData>();
+ RETURN_NOT_OK(loader.Load(&field, column.get()));
Review comment:
Good catch. I copied this code snippet from `ArrayLoader`'s usage, and
in this case, it is not needed too be a `shared_ptr`. Fixed in the latest
commit.
##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1706,6 +1706,54 @@ TEST_F(TestFileFormat, ReadFieldSubset) {
TestReadSubsetOfFields(); }
TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
+class TrackedRandomAccessFile : public io::RandomAccessFile {
+ public:
+ explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate)
+ : delegate_(delegate) {}
+
+ Status Close() override { return delegate_->Close(); }
+ bool closed() const override { return delegate_->closed(); }
+ Result<int64_t> Tell() const override { return delegate_->Tell(); }
+ Status Seek(int64_t position) override { return delegate_->Seek(position); }
+ Result<int64_t> Read(int64_t nbytes, void* out) override {
+ ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+ save_read_range(position, nbytes);
+ return delegate_->Read(nbytes, out);
+ }
+ Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+ ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+ save_read_range(position, nbytes);
+ return delegate_->Read(nbytes);
+ }
+ bool supports_zero_copy() const override { return
delegate_->supports_zero_copy(); }
+ Result<int64_t> GetSize() override { return delegate_->GetSize(); }
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override
{
+ save_read_range(position, nbytes);
+ return delegate_->ReadAt(position, nbytes, out);
+ }
+ Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
override {
+ save_read_range(position, nbytes);
+ return delegate_->ReadAt(position, nbytes);
+ }
+ Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
+ int64_t position, int64_t nbytes)
override {
+ save_read_range(position, nbytes);
+ return delegate_->ReadAsync(io_context, position, nbytes);
+ }
+
+ int64_t num_reads() const { return read_ranges_.size(); }
+
+ const std::vector<io::ReadRange>& get_read_ranges() const { return
read_ranges_; }
+
+ private:
+ io::RandomAccessFile* delegate_;
+ std::vector<io::ReadRange> read_ranges_;
+
+ void save_read_range(int64_t offset, int64_t length) {
Review comment:
Changed to `CamelCase` now.
BTW, where can I find the doc about method/variable naming convention? I
find methods/variables named in different conventions and I am not sure what to
follow actually.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]