This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 771c37aab8 GH-34051: [C++] GcsFileSystem lazily starts sequential 
reads (#34052)
771c37aab8 is described below

commit 771c37aab8757287b3fa9cfe1bfb87992126ee08
Author: Carlos O'Ryan <[email protected]>
AuthorDate: Tue Feb 7 00:11:35 2023 +0000

    GH-34051: [C++] GcsFileSystem lazily starts sequential reads (#34052)
    
    `OpenInputFile()` returns a `io::RandomAccessFile` which supports 
sequential reads as well as random access reads. The previous implementation 
eagerly started a sequential read, but many applications do not use that aspect 
of the API. Because GCS has fairly high latency, this can slow down 
applications that are only going to read data using `ReadAt()`. This includes 
applications using Parquet files via PyArrow.
    
    Fixes #34051
    
    ### What changes are included in this PR?
    
    Change the GcsFileSystem class to lazily start the download used to 
implement the `io::InputFile` APIs.
    
    ### Are these changes tested?
    
    I think so: the existing tests cover the affected functions.
    
    ### Are there any user-facing changes?
    
    No.
    
    * Closes: #34051
    
    Authored-by: Carlos O'Ryan <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/arrow/filesystem/gcsfs.cc      | 56 +++++++++++++++++++---------------
 cpp/src/arrow/filesystem/gcsfs_test.cc | 16 ++++++++++
 2 files changed, 48 insertions(+), 24 deletions(-)

diff --git a/cpp/src/arrow/filesystem/gcsfs.cc 
b/cpp/src/arrow/filesystem/gcsfs.cc
index ce11c0aa22..08099d94f9 100644
--- a/cpp/src/arrow/filesystem/gcsfs.cc
+++ b/cpp/src/arrow/filesystem/gcsfs.cc
@@ -247,32 +247,41 @@ using InputStreamFactory = 
std::function<Result<std::shared_ptr<GcsInputStream>>
 
 class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
  public:
-  GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata,
-                      std::shared_ptr<GcsInputStream> stream)
-      : factory_(std::move(factory)),
-        metadata_(std::move(metadata)),
-        stream_(std::move(stream)) {}
+  GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata)
+      : factory_(std::move(factory)), metadata_(std::move(metadata)) {}
   ~GcsRandomAccessFile() override = default;
 
   //@{
   // @name FileInterface
-  Status Close() override { return stream_->Close(); }
-  Status Abort() override { return stream_->Abort(); }
-  Result<int64_t> Tell() const override { return 
stream_->TellOr(metadata_.size()); }
-  bool closed() const override { return stream_->closed(); }
+  Status Close() override {
+    ARROW_RETURN_NOT_OK(InitializeStream());
+    return stream_->Close();
+  }
+  Status Abort() override {
+    ARROW_RETURN_NOT_OK(InitializeStream());
+    return stream_->Abort();
+  }
+  Result<int64_t> Tell() const override {
+    ARROW_RETURN_NOT_OK(InitializeStream());
+    return stream_->TellOr(metadata_.size());
+  }
+  bool closed() const override {
+    auto status = InitializeStream();
+    if (!status.ok()) return true;
+    return stream_->closed();
+  }
   //@}
 
   //@{
   // @name Readable
   Result<int64_t> Read(int64_t nbytes, void* out) override {
+    ARROW_RETURN_NOT_OK(InitializeStream());
     return stream_->Read(nbytes, out);
   }
   Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    ARROW_RETURN_NOT_OK(InitializeStream());
     return stream_->Read(nbytes);
   }
-  const arrow::io::IOContext& io_context() const override {
-    return stream_->io_context();
-  }
   //@}
 
   //@{
@@ -310,9 +319,16 @@ class GcsRandomAccessFile : public 
arrow::io::RandomAccessFile {
   }
 
  private:
+  Status InitializeStream() const {
+    if (!stream_) {
+      ARROW_ASSIGN_OR_RAISE(stream_, 
factory_(gcs::Generation(metadata_.generation()),
+                                              gcs::ReadFromOffset()));
+    }
+    return Status::OK();
+  }
   InputStreamFactory factory_;
   gcs::ObjectMetadata metadata_;
-  std::shared_ptr<GcsInputStream> stream_;
+  std::shared_ptr<GcsInputStream> mutable stream_;
 };
 
 google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
@@ -929,12 +945,9 @@ Result<std::shared_ptr<io::RandomAccessFile>> 
GcsFileSystem::OpenInputFile(
   auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) {
     return impl->OpenInputStream(p, g, offset);
   };
-  ARROW_ASSIGN_OR_RAISE(auto stream,
-                        impl_->OpenInputStream(p, 
gcs::Generation(metadata->generation()),
-                                               gcs::ReadFromOffset()));
 
   return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
-                                               *std::move(metadata), 
std::move(stream));
+                                               *std::move(metadata));
 }
 
 Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
@@ -947,16 +960,11 @@ Result<std::shared_ptr<io::RandomAccessFile>> 
GcsFileSystem::OpenInputFile(
   ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
   auto metadata = impl_->GetObjectMetadata(p);
   ARROW_GCS_RETURN_NOT_OK(metadata.status());
-  auto impl = impl_;
-  auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) {
+  auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadFromOffset 
offset) {
     return impl->OpenInputStream(p, g, offset);
   };
-  ARROW_ASSIGN_OR_RAISE(auto stream,
-                        impl_->OpenInputStream(p, 
gcs::Generation(metadata->generation()),
-                                               gcs::ReadFromOffset()));
-
   return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
-                                               *std::move(metadata), 
std::move(stream));
+                                               *std::move(metadata));
 }
 
 Result<std::shared_ptr<io::OutputStream>> GcsFileSystem::OpenOutputStream(
diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc 
b/cpp/src/arrow/filesystem/gcsfs_test.cc
index fb14f7b385..af22087530 100644
--- a/cpp/src/arrow/filesystem/gcsfs_test.cc
+++ b/cpp/src/arrow/filesystem/gcsfs_test.cc
@@ -1320,6 +1320,22 @@ TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) {
   }
 }
 
+TEST_F(GcsIntegrationTest, OpenInputFileIoContext) {
+  auto fs = GcsFileSystem::Make(TestGcsOptions());
+
+  // Create a test file.
+  const auto path = PreexistingBucketPath() + 
"OpenInputFileIoContext/object-name";
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  const std::string contents = "The quick brown fox jumps over the lazy dog";
+  ASSERT_OK(output->Write(contents.data(), contents.size()));
+  ASSERT_OK(output->Close());
+
+  std::shared_ptr<io::RandomAccessFile> file;
+  ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
+  EXPECT_EQ(fs->io_context().external_id(), file->io_context().external_id());
+}
+
 TEST_F(GcsIntegrationTest, OpenInputFileInfo) {
   auto fs = GcsFileSystem::Make(TestGcsOptions());
 

Reply via email to