This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 771c37aab8 GH-34051: [C++] GcsFileSystem lazily starts sequential
reads (#34052)
771c37aab8 is described below
commit 771c37aab8757287b3fa9cfe1bfb87992126ee08
Author: Carlos O'Ryan <[email protected]>
AuthorDate: Tue Feb 7 00:11:35 2023 +0000
GH-34051: [C++] GcsFileSystem lazily starts sequential reads (#34052)
`OpenInputFile()` returns a `io::RandomAccessFile` which supports
sequential reads as well as random access reads. The previous implementation
eagerly started a sequential read, but many applications do not use that aspect
of the API. Because GCS has fairly high latency, this can slow down
applications that are only going to read data using `ReadAt()`. This includes
applications using Parquet files via PyArrow.
Fixes #34051
### What changes are included in this PR?
Change the GcsFileSystem class to lazily start the download used to
implement the `io::InputFile` APIs.
### Are these changes tested?
I think so: the existing tests cover the affected functions.
### Are there any user-facing changes?
No.
* Closes: #34051
Authored-by: Carlos O'Ryan <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/arrow/filesystem/gcsfs.cc | 56 +++++++++++++++++++---------------
cpp/src/arrow/filesystem/gcsfs_test.cc | 16 ++++++++++
2 files changed, 48 insertions(+), 24 deletions(-)
diff --git a/cpp/src/arrow/filesystem/gcsfs.cc
b/cpp/src/arrow/filesystem/gcsfs.cc
index ce11c0aa22..08099d94f9 100644
--- a/cpp/src/arrow/filesystem/gcsfs.cc
+++ b/cpp/src/arrow/filesystem/gcsfs.cc
@@ -247,32 +247,41 @@ using InputStreamFactory =
std::function<Result<std::shared_ptr<GcsInputStream>>
class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
public:
- GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata,
- std::shared_ptr<GcsInputStream> stream)
- : factory_(std::move(factory)),
- metadata_(std::move(metadata)),
- stream_(std::move(stream)) {}
+ GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata)
+ : factory_(std::move(factory)), metadata_(std::move(metadata)) {}
~GcsRandomAccessFile() override = default;
//@{
// @name FileInterface
- Status Close() override { return stream_->Close(); }
- Status Abort() override { return stream_->Abort(); }
- Result<int64_t> Tell() const override { return
stream_->TellOr(metadata_.size()); }
- bool closed() const override { return stream_->closed(); }
+ Status Close() override {
+ ARROW_RETURN_NOT_OK(InitializeStream());
+ return stream_->Close();
+ }
+ Status Abort() override {
+ ARROW_RETURN_NOT_OK(InitializeStream());
+ return stream_->Abort();
+ }
+ Result<int64_t> Tell() const override {
+ ARROW_RETURN_NOT_OK(InitializeStream());
+ return stream_->TellOr(metadata_.size());
+ }
+ bool closed() const override {
+ auto status = InitializeStream();
+ if (!status.ok()) return true;
+ return stream_->closed();
+ }
//@}
//@{
// @name Readable
Result<int64_t> Read(int64_t nbytes, void* out) override {
+ ARROW_RETURN_NOT_OK(InitializeStream());
return stream_->Read(nbytes, out);
}
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+ ARROW_RETURN_NOT_OK(InitializeStream());
return stream_->Read(nbytes);
}
- const arrow::io::IOContext& io_context() const override {
- return stream_->io_context();
- }
//@}
//@{
@@ -310,9 +319,16 @@ class GcsRandomAccessFile : public
arrow::io::RandomAccessFile {
}
private:
+ Status InitializeStream() const {
+ if (!stream_) {
+ ARROW_ASSIGN_OR_RAISE(stream_,
factory_(gcs::Generation(metadata_.generation()),
+ gcs::ReadFromOffset()));
+ }
+ return Status::OK();
+ }
InputStreamFactory factory_;
gcs::ObjectMetadata metadata_;
- std::shared_ptr<GcsInputStream> stream_;
+ std::shared_ptr<GcsInputStream> mutable stream_;
};
google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
@@ -929,12 +945,9 @@ Result<std::shared_ptr<io::RandomAccessFile>>
GcsFileSystem::OpenInputFile(
auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) {
return impl->OpenInputStream(p, g, offset);
};
- ARROW_ASSIGN_OR_RAISE(auto stream,
- impl_->OpenInputStream(p,
gcs::Generation(metadata->generation()),
- gcs::ReadFromOffset()));
return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
- *std::move(metadata),
std::move(stream));
+ *std::move(metadata));
}
Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
@@ -947,16 +960,11 @@ Result<std::shared_ptr<io::RandomAccessFile>>
GcsFileSystem::OpenInputFile(
ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(info.path()));
auto metadata = impl_->GetObjectMetadata(p);
ARROW_GCS_RETURN_NOT_OK(metadata.status());
- auto impl = impl_;
- auto open_stream = [impl, p](gcs::Generation g, gcs::ReadFromOffset offset) {
+ auto open_stream = [impl = impl_, p](gcs::Generation g, gcs::ReadFromOffset
offset) {
return impl->OpenInputStream(p, g, offset);
};
- ARROW_ASSIGN_OR_RAISE(auto stream,
- impl_->OpenInputStream(p,
gcs::Generation(metadata->generation()),
- gcs::ReadFromOffset()));
-
return std::make_shared<GcsRandomAccessFile>(std::move(open_stream),
- *std::move(metadata),
std::move(stream));
+ *std::move(metadata));
}
Result<std::shared_ptr<io::OutputStream>> GcsFileSystem::OpenOutputStream(
diff --git a/cpp/src/arrow/filesystem/gcsfs_test.cc
b/cpp/src/arrow/filesystem/gcsfs_test.cc
index fb14f7b385..af22087530 100644
--- a/cpp/src/arrow/filesystem/gcsfs_test.cc
+++ b/cpp/src/arrow/filesystem/gcsfs_test.cc
@@ -1320,6 +1320,22 @@ TEST_F(GcsIntegrationTest, OpenInputFileRandomSeek) {
}
}
+TEST_F(GcsIntegrationTest, OpenInputFileIoContext) {
+ auto fs = GcsFileSystem::Make(TestGcsOptions());
+
+ // Create a test file.
+ const auto path = PreexistingBucketPath() +
"OpenInputFileIoContext/object-name";
+ std::shared_ptr<io::OutputStream> output;
+ ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+ const std::string contents = "The quick brown fox jumps over the lazy dog";
+ ASSERT_OK(output->Write(contents.data(), contents.size()));
+ ASSERT_OK(output->Close());
+
+ std::shared_ptr<io::RandomAccessFile> file;
+ ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile(path));
+ EXPECT_EQ(fs->io_context().external_id(), file->io_context().external_id());
+}
+
TEST_F(GcsIntegrationTest, OpenInputFileInfo) {
auto fs = GcsFileSystem::Make(TestGcsOptions());