lidavidm commented on a change in pull request #11486:
URL: https://github.com/apache/arrow/pull/11486#discussion_r741024141
##########
File path: cpp/src/arrow/ipc/message.cc
##########
@@ -279,8 +280,37 @@ std::string FormatMessageType(MessageType type) {
return "unknown";
}
+Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
+ io::RandomAccessFile* file,
+ const FieldsLoaderFunction& fields_loader,
+ const std::shared_ptr<Buffer>& metadata, int64_t
required_size,
+ std::shared_ptr<Buffer>& body) {
+ const flatbuf::Message* message = nullptr;
+ uint8_t continuation_metadata_size = sizeof(int32_t) + sizeof(int32_t);
+ // skip 8 bytes (32-bit continuation indicator + 32-bit little-endian length
prefix)
+ RETURN_NOT_OK(internal::VerifyMessage(metadata->data() +
continuation_metadata_size,
+ metadata->size() -
continuation_metadata_size,
+ &message));
+
+ auto batch = message->header_as_RecordBatch();
+ auto io_recorded_random_access_file =
std::unique_ptr<IoRecordedRandomAccessFile>(
Review comment:
nit: since the pointer doesn't escape this function, we can
stack-allocate the file instead.
```cpp
IoRecordedRandomAccessFile file(required_size);
RETURN_NOT_OK(fields_loader(batch, &file));
// etc.
```
##########
File path: cpp/src/arrow/ipc/message.h
##########
@@ -24,6 +24,7 @@
#include <string>
#include <utility>
+#include "arrow/io/interfaces.h"
Review comment:
We can forward-declare io::RandomAccessFile if we want to avoid
including the entire header.
##########
File path: cpp/src/arrow/ipc/message.cc
##########
@@ -279,8 +280,37 @@ std::string FormatMessageType(MessageType type) {
return "unknown";
}
+Status ReadFieldsSubset(int64_t offset, int32_t metadata_length,
+ io::RandomAccessFile* file,
+ const FieldsLoaderFunction& fields_loader,
+ const std::shared_ptr<Buffer>& metadata, int64_t
required_size,
+ std::shared_ptr<Buffer>& body) {
+ const flatbuf::Message* message = nullptr;
+ uint8_t continuation_metadata_size = sizeof(int32_t) + sizeof(int32_t);
+ // skip 8 bytes (32-bit continuation indicator + 32-bit little-endian length
prefix)
+ RETURN_NOT_OK(internal::VerifyMessage(metadata->data() +
continuation_metadata_size,
+ metadata->size() -
continuation_metadata_size,
+ &message));
+
+ auto batch = message->header_as_RecordBatch();
Review comment:
Should we return an error if the message isn't a record batch?
##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1706,6 +1706,54 @@ TEST_F(TestFileFormat, ReadFieldSubset) {
TestReadSubsetOfFields(); }
TEST_F(TestFileFormatGenerator, ReadFieldSubset) { TestReadSubsetOfFields(); }
+class TrackedRandomAccessFile : public io::RandomAccessFile {
+ public:
+ explicit TrackedRandomAccessFile(io::RandomAccessFile* delegate)
+ : delegate_(delegate) {}
+
+ Status Close() override { return delegate_->Close(); }
+ bool closed() const override { return delegate_->closed(); }
+ Result<int64_t> Tell() const override { return delegate_->Tell(); }
+ Status Seek(int64_t position) override { return delegate_->Seek(position); }
+ Result<int64_t> Read(int64_t nbytes, void* out) override {
+ ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+ save_read_range(position, nbytes);
+ return delegate_->Read(nbytes, out);
+ }
+ Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+ ARROW_ASSIGN_OR_RAISE(auto position, delegate_->Tell());
+ save_read_range(position, nbytes);
+ return delegate_->Read(nbytes);
+ }
+ bool supports_zero_copy() const override { return
delegate_->supports_zero_copy(); }
+ Result<int64_t> GetSize() override { return delegate_->GetSize(); }
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override
{
+ save_read_range(position, nbytes);
+ return delegate_->ReadAt(position, nbytes, out);
+ }
+ Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
override {
+ save_read_range(position, nbytes);
+ return delegate_->ReadAt(position, nbytes);
+ }
+ Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
+ int64_t position, int64_t nbytes)
override {
+ save_read_range(position, nbytes);
+ return delegate_->ReadAsync(io_context, position, nbytes);
+ }
+
+ int64_t num_reads() const { return read_ranges_.size(); }
+
+ const std::vector<io::ReadRange>& get_read_ranges() const { return
read_ranges_; }
+
+ private:
+ io::RandomAccessFile* delegate_;
+ std::vector<io::ReadRange> read_ranges_;
+
+ void save_read_range(int64_t offset, int64_t length) {
Review comment:
nit, but non-getter methods use CamelCase naming
##########
File path: cpp/src/arrow/ipc/reader.h
##########
@@ -212,6 +212,53 @@ class ARROW_EXPORT RecordBatchFileReader
arrow::internal::Executor* executor = NULLPTR) = 0;
};
+/// \class IoRecordedRandomAccessFile
+/// \brief An RandomAccessFile that doesn't perform real IO, but only save all
the IO
+/// operations it receives, including read operation's <offset, length>, for
replaying
+/// later
+class ARROW_EXPORT IoRecordedRandomAccessFile : public io::RandomAccessFile {
Review comment:
nit: could this be moved to a `reader_internal.h` or at least be put in
a `namespace internal` (e.g. at the bottom) to make it clear that it's not
really part of the public API?
(`*_internal.h` headers don't get installed so they're ideal for this
purpose)
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1061,6 +1062,31 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
return internal::GetMetadataVersion(footer_->version());
}
+ static Status LoadFieldsSubset(const flatbuf::RecordBatch* metadata,
+ const IpcReadOptions& options,
+ io::RandomAccessFile* file,
+ const std::shared_ptr<Schema>& schema,
+ const std::vector<bool>* inclusion_mask,
+ MetadataVersion metadata_version =
MetadataVersion::V5) {
+ ArrayLoader loader(metadata, metadata_version, options, file);
+ for (int i = 0; i < schema->num_fields(); ++i) {
+ const Field& field = *schema->field(i);
+ if (!inclusion_mask || (*inclusion_mask)[i]) {
+ // Read field
+ auto column = std::make_shared<ArrayData>();
+ RETURN_NOT_OK(loader.Load(&field, column.get()));
Review comment:
This column is also only used to get the side effect of Load? In that
case this ArrayData can be stack-allocated too I think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]