This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b65c84a1f GH-34063: [C++] Avoid waste in `GcsFileSystem::ReadAt()` 
(#34065)
6b65c84a1f is described below

commit 6b65c84a1f5bc92ea83b736f1c955d807f3036d2
Author: Carlos O'Ryan <[email protected]>
AuthorDate: Wed Feb 8 18:30:55 2023 +0000

    GH-34063: [C++] Avoid waste in `GcsFileSystem::ReadAt()` (#34065)
    
    With this change `GcsFileSystem::ReadAt()` will only request *exactly* the 
bytes it needs.
    
    Fixes #34063.
    
    ### Are these changes tested?
    
    There are existing tests for `ReadAt()` this is just a performance change.
    
    ### Are there any user-facing changes?
    
    No.
    
    * Closes: #34063
    
    Authored-by: Carlos O'Ryan <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 cpp/src/arrow/filesystem/gcsfs.cc | 35 +++++++++++++++++++++--------------
 1 file changed, 21 insertions(+), 14 deletions(-)

diff --git a/cpp/src/arrow/filesystem/gcsfs.cc 
b/cpp/src/arrow/filesystem/gcsfs.cc
index 08099d94f9..f063e31b5c 100644
--- a/cpp/src/arrow/filesystem/gcsfs.cc
+++ b/cpp/src/arrow/filesystem/gcsfs.cc
@@ -243,7 +243,7 @@ class GcsOutputStream : public arrow::io::OutputStream {
 };
 
 using InputStreamFactory = 
std::function<Result<std::shared_ptr<GcsInputStream>>(
-    gcs::Generation, gcs::ReadFromOffset)>;
+    gcs::Generation, gcs::ReadRange, gcs::ReadFromOffset)>;
 
 class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
  public:
@@ -298,14 +298,16 @@ class GcsRandomAccessFile : public 
arrow::io::RandomAccessFile {
     if (closed()) return Status::Invalid("Cannot read from closed file");
     std::shared_ptr<io::InputStream> stream;
     ARROW_ASSIGN_OR_RAISE(stream, 
factory_(gcs::Generation(metadata_.generation()),
-                                           gcs::ReadFromOffset(position)));
+                                           gcs::ReadRange(position, position + 
nbytes),
+                                           gcs::ReadFromOffset()));
     return stream->Read(nbytes, out);
   }
   Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) 
override {
     if (closed()) return Status::Invalid("Cannot read from closed file");
     std::shared_ptr<io::InputStream> stream;
     ARROW_ASSIGN_OR_RAISE(stream, 
factory_(gcs::Generation(metadata_.generation()),
-                                           gcs::ReadFromOffset(position)));
+                                           gcs::ReadRange(position, position + 
nbytes),
+                                           gcs::ReadFromOffset()));
     return stream->Read(nbytes);
   }
   //@}
@@ -313,8 +315,9 @@ class GcsRandomAccessFile : public 
arrow::io::RandomAccessFile {
   // from Seekable
   Status Seek(int64_t position) override {
     if (closed()) return Status::Invalid("Cannot seek in a closed file");
-    ARROW_ASSIGN_OR_RAISE(stream_, 
factory_(gcs::Generation(metadata_.generation()),
-                                            gcs::ReadFromOffset(position)));
+    ARROW_ASSIGN_OR_RAISE(
+        stream_, factory_(gcs::Generation(metadata_.generation()), 
gcs::ReadRange(),
+                          gcs::ReadFromOffset(position)));
     return Status::OK();
   }
 
@@ -322,7 +325,7 @@ class GcsRandomAccessFile : public 
arrow::io::RandomAccessFile {
   Status InitializeStream() const {
     if (!stream_) {
       ARROW_ASSIGN_OR_RAISE(stream_, 
factory_(gcs::Generation(metadata_.generation()),
-                                              gcs::ReadFromOffset()));
+                                              gcs::ReadRange(), 
gcs::ReadFromOffset()));
     }
     return Status::OK();
   }
@@ -632,8 +635,9 @@ class GcsFileSystem::Impl {
 
   Result<std::shared_ptr<GcsInputStream>> OpenInputStream(const GcsPath& path,
                                                           gcs::Generation 
generation,
+                                                          gcs::ReadRange range,
                                                           gcs::ReadFromOffset 
offset) {
-    auto stream = client_.ReadObject(path.bucket, path.object, generation, 
offset);
+    auto stream = client_.ReadObject(path.bucket, path.object, generation, 
range, offset);
     ARROW_GCS_RETURN_NOT_OK(stream.status());
     return std::make_shared<GcsInputStream>(std::move(stream), path, 
generation, client_);
   }
@@ -921,7 +925,8 @@ Result<std::shared_ptr<io::InputStream>> 
GcsFileSystem::OpenInputStream(
     const std::string& path) {
   ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(path));
   ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
-  return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadFromOffset());
+  return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadRange(),
+                                gcs::ReadFromOffset());
 }
 
 Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
@@ -932,7 +937,8 @@ Result<std::shared_ptr<io::InputStream>> 
GcsFileSystem::OpenInputStream(
   }
   ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
   ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
-  return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadFromOffset());
+  return impl_->OpenInputStream(p, gcs::Generation(), gcs::ReadRange(),
+                                gcs::ReadFromOffset());
 }
 
 Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
@@ -941,9 +947,9 @@ Result<std::shared_ptr<io::RandomAccessFile>> 
GcsFileSystem::OpenInputFile(
   ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(path));
   auto metadata = impl_->GetObjectMetadata(p);
   ARROW_GCS_RETURN_NOT_OK(metadata.status());
-  auto impl = impl_;
-  auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) {
-    return impl->OpenInputStream(p, g, offset);
+  auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadRange range,
+                                       gcs::ReadFromOffset offset) {
+    return impl->OpenInputStream(p, g, range, offset);
   };
 
   return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
@@ -960,8 +966,9 @@ Result<std::shared_ptr<io::RandomAccessFile>> 
GcsFileSystem::OpenInputFile(
   ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
   auto metadata = impl_->GetObjectMetadata(p);
   ARROW_GCS_RETURN_NOT_OK(metadata.status());
-  auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadFromOffset 
offset) {
-    return impl->OpenInputStream(p, g, offset);
+  auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadRange range,
+                                       gcs::ReadFromOffset offset) {
+    return impl->OpenInputStream(p, g, range, offset);
   };
   return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
                                                *std::move(metadata));

Reply via email to