This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 6b65c84a1f GH-34063: [C++] Avoid waste in `GcsFileSystem::ReadAt()`
(#34065)
6b65c84a1f is described below
commit 6b65c84a1f5bc92ea83b736f1c955d807f3036d2
Author: Carlos O'Ryan <[email protected]>
AuthorDate: Wed Feb 8 18:30:55 2023 +0000
GH-34063: [C++] Avoid waste in `GcsFileSystem::ReadAt()` (#34065)
With this change `GcsFileSystem::ReadAt()` will only request *exactly* the
bytes it needs.
Fixes #34063.
### Are these changes tested?
There are existing tests for `ReadAt()` this is just a performance change.
### Are there any user-facing changes?
No.
* Closes: #34063
Authored-by: Carlos O'Ryan <[email protected]>
Signed-off-by: David Li <[email protected]>
---
cpp/src/arrow/filesystem/gcsfs.cc | 35 +++++++++++++++++++++--------------
1 file changed, 21 insertions(+), 14 deletions(-)
diff --git a/cpp/src/arrow/filesystem/gcsfs.cc
b/cpp/src/arrow/filesystem/gcsfs.cc
index 08099d94f9..f063e31b5c 100644
--- a/cpp/src/arrow/filesystem/gcsfs.cc
+++ b/cpp/src/arrow/filesystem/gcsfs.cc
@@ -243,7 +243,7 @@ class GcsOutputStream : public arrow::io::OutputStream {
};
using InputStreamFactory =
std::function<Result<std::shared_ptr<GcsInputStream>>(
- gcs::Generation, gcs::ReadFromOffset)>;
+ gcs::Generation, gcs::ReadRange, gcs::ReadFromOffset)>;
class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
public:
@@ -298,14 +298,16 @@ class GcsRandomAccessFile : public
arrow::io::RandomAccessFile {
if (closed()) return Status::Invalid("Cannot read from closed file");
std::shared_ptr<io::InputStream> stream;
ARROW_ASSIGN_OR_RAISE(stream,
factory_(gcs::Generation(metadata_.generation()),
- gcs::ReadFromOffset(position)));
+ gcs::ReadRange(position, position +
nbytes),
+ gcs::ReadFromOffset()));
return stream->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
override {
if (closed()) return Status::Invalid("Cannot read from closed file");
std::shared_ptr<io::InputStream> stream;
ARROW_ASSIGN_OR_RAISE(stream,
factory_(gcs::Generation(metadata_.generation()),
- gcs::ReadFromOffset(position)));
+ gcs::ReadRange(position, position +
nbytes),
+ gcs::ReadFromOffset()));
return stream->Read(nbytes);
}
//@}
@@ -313,8 +315,9 @@ class GcsRandomAccessFile : public
arrow::io::RandomAccessFile {
// from Seekable
Status Seek(int64_t position) override {
if (closed()) return Status::Invalid("Cannot seek in a closed file");
- ARROW_ASSIGN_OR_RAISE(stream_,
factory_(gcs::Generation(metadata_.generation()),
- gcs::ReadFromOffset(position)));
+ ARROW_ASSIGN_OR_RAISE(
+ stream_, factory_(gcs::Generation(metadata_.generation()),
gcs::ReadRange(),
+ gcs::ReadFromOffset(position)));
return Status::OK();
}
@@ -322,7 +325,7 @@ class GcsRandomAccessFile : public
arrow::io::RandomAccessFile {
Status InitializeStream() const {
if (!stream_) {
ARROW_ASSIGN_OR_RAISE(stream_,
factory_(gcs::Generation(metadata_.generation()),
- gcs::ReadFromOffset()));
+ gcs::ReadRange(),
gcs::ReadFromOffset()));
}
return Status::OK();
}
@@ -632,8 +635,9 @@ class GcsFileSystem::Impl {
Result<std::shared_ptr<GcsInputStream>> OpenInputStream(const GcsPath& path,
gcs::Generation
generation,
+ gcs::ReadRange range,
gcs::ReadFromOffset
offset) {
- auto stream = client_.ReadObject(path.bucket, path.object, generation,
offset);
+ auto stream = client_.ReadObject(path.bucket, path.object, generation,
range, offset);
ARROW_GCS_RETURN_NOT_OK(stream.status());
return std::make_shared<GcsInputStream>(std::move(stream), path,
generation, client_);
}
@@ -921,7 +925,8 @@ Result<std::shared_ptr<io::InputStream>>
GcsFileSystem::OpenInputStream(
const std::string& path) {
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path));
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
- return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadFromOffset());
+ return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadRange(),
+ gcs::ReadFromOffset());
}
Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
@@ -932,7 +937,8 @@ Result<std::shared_ptr<io::InputStream>>
GcsFileSystem::OpenInputStream(
}
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
- return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadFromOffset());
+ return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadRange(),
+ gcs::ReadFromOffset());
}
Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
@@ -941,9 +947,9 @@ Result<std::shared_ptr<io::RandomAccessFile>>
GcsFileSystem::OpenInputFile(
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
- auto impl = impl_;
- auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) {
- return impl->OpenInputStream(p, g, offset);
+ auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadRange range,
+ gcs::ReadFromOffset offset) {
+ return impl->OpenInputStream(p, g, range, offset);
};
return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
@@ -960,8 +966,9 @@ Result<std::shared_ptr<io::RandomAccessFile>>
GcsFileSystem::OpenInputFile(
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
- auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadFromOffset
offset) {
- return impl->OpenInputStream(p, g, offset);
+ auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadRange range,
+ gcs::ReadFromOffset offset) {
+ return impl->OpenInputStream(p, g, range, offset);
};
return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
*std::move(metadata));