pitrou commented on a change in pull request #11550: URL: https://github.com/apache/arrow/pull/11550#discussion_r740259034
########## File path: cpp/src/arrow/filesystem/gcsfs_internal.cc ########## @@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) { return Status::OK(); } +namespace gcs = ::google::cloud::storage; + +Result<google::cloud::storage::EncryptionKey> ToEncryptionKey( Review comment: Nit: can write `Result<gcs::EncryptionKey>` ########## File path: cpp/src/arrow/filesystem/gcsfs_internal.cc ########## @@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) { return Status::OK(); } +namespace gcs = ::google::cloud::storage; + +Result<google::cloud::storage::EncryptionKey> ToEncryptionKey( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::EncryptionKey{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "encryptionKeyBase64") { + return gcs::EncryptionKey::FromBase64Key(values[i]); + } + } + return gcs::EncryptionKey{}; +} + +Result<gcs::KmsKeyName> ToKmsKeyName( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::KmsKeyName{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "kmsKeyName") { + return gcs::KmsKeyName(values[i]); + } + } + return gcs::KmsKeyName{}; +} + +Result<gcs::PredefinedAcl> ToPredefinedAcl( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::PredefinedAcl{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "predefinedAcl") { + return gcs::PredefinedAcl(values[i]); + } + } + return gcs::PredefinedAcl{}; +} + +Result<gcs::WithObjectMetadata> ToObjectMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::WithObjectMetadata{}; + } + + static auto const setters = [] { + using setter = std::function<Status(gcs::ObjectMetadata&, const std::string&)>; + return std::map<std::string, setter>{ + {"cacheControl", Review comment: Is there a particular reason for using this casing? The S3 implementation uses "Cache-Control", "Content-Type", etc., so I'd rather keep the same exact naming rather than reinvent another. ########## File path: cpp/src/arrow/filesystem/gcsfs_test.cc ########## @@ -259,6 +352,55 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) { EXPECT_EQ(result.status().code(), StatusCode::IOError); } +TEST_F(GcsIntegrationTest, WriteObjectSmall) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + const auto path = kPreexistingBucket + std::string("/test-write-object"); + std::shared_ptr<io::OutputStream> output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + const auto expected = std::string(kLoremIpsum); + ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + std::shared_ptr<io::InputStream> input; + ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + + std::array<char, 1024> inbuf{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + + EXPECT_EQ(std::string(inbuf.data(), size), expected); +} + +TEST_F(GcsIntegrationTest, WriteObjectLarge) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + const auto path = kPreexistingBucket + std::string("/test-write-object"); + std::shared_ptr<io::OutputStream> output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + const auto b0 = std::string(512 * 1024, 'A'); + const auto b1 = std::string(768 * 1024, 'B'); + const auto b2 = std::string(1024 * 1024, 'C'); Review comment: Perhaps choose those string sizes so that they are not multiples of the underlying buffer size? ########## File path: cpp/src/arrow/filesystem/gcsfs_test.cc ########## @@ -259,6 +352,55 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) { EXPECT_EQ(result.status().code(), StatusCode::IOError); } +TEST_F(GcsIntegrationTest, WriteObjectSmall) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + const auto path = kPreexistingBucket + std::string("/test-write-object"); + std::shared_ptr<io::OutputStream> output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + const auto expected = std::string(kLoremIpsum); + ASSERT_OK(output->Write(expected.data(), expected.size())); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + std::shared_ptr<io::InputStream> input; + ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + + std::array<char, 1024> inbuf{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data())); + + EXPECT_EQ(std::string(inbuf.data(), size), expected); +} + +TEST_F(GcsIntegrationTest, WriteObjectLarge) { + auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions()); + + const auto path = kPreexistingBucket + std::string("/test-write-object"); + std::shared_ptr<io::OutputStream> output; + ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {})); + const auto b0 = std::string(512 * 1024, 'A'); + const auto b1 = std::string(768 * 1024, 'B'); + const auto b2 = std::string(1024 * 1024, 'C'); + ASSERT_OK(output->Write(b0.data(), b0.size())); + ASSERT_OK(output->Write(b1.data(), b1.size())); + ASSERT_OK(output->Write(b2.data(), b2.size())); + ASSERT_OK(output->Close()); + + // Verify we can read the object back. + std::shared_ptr<io::InputStream> input; + ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path)); + + std::string contents; + std::shared_ptr<Buffer> buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024)); + contents.append(buffer->ToString()); + } while (buffer && buffer->size() != 0); Review comment: Nit: `buffer` should never be null in this context. If it is, it's a bug in `Read`. ########## File path: cpp/src/arrow/filesystem/gcsfs_internal.cc ########## @@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) { return Status::OK(); } +namespace gcs = ::google::cloud::storage; + +Result<google::cloud::storage::EncryptionKey> ToEncryptionKey( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::EncryptionKey{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "encryptionKeyBase64") { + return gcs::EncryptionKey::FromBase64Key(values[i]); + } + } + return gcs::EncryptionKey{}; +} + +Result<gcs::KmsKeyName> ToKmsKeyName( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::KmsKeyName{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "kmsKeyName") { + return gcs::KmsKeyName(values[i]); + } + } + return gcs::KmsKeyName{}; +} + +Result<gcs::PredefinedAcl> ToPredefinedAcl( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::PredefinedAcl{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "predefinedAcl") { + return gcs::PredefinedAcl(values[i]); + } + } + return gcs::PredefinedAcl{}; +} + +Result<gcs::WithObjectMetadata> ToObjectMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::WithObjectMetadata{}; + } + + static auto const setters = [] { + using setter = std::function<Status(gcs::ObjectMetadata&, const std::string&)>; + return std::map<std::string, setter>{ + {"cacheControl", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_cache_control(v); + return Status::OK(); + }}, + {"contentDisposition", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_disposition(v); + return Status::OK(); + }}, + {"contentEncoding", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_encoding(v); + return Status::OK(); + }}, + {"contentLanguage", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_language(v); + return Status::OK(); + }}, + {"contentType", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_type(v); + return Status::OK(); + }}, + {"customTime", + [](gcs::ObjectMetadata& m, const std::string& v) { + std::string err; + absl::Time t; + if (!absl::ParseTime(absl::RFC3339_full, v, &t, &err)) { Review comment: Is absl already an include-time dependency of GCS? ########## File path: cpp/src/arrow/filesystem/gcsfs.cc ########## @@ -102,6 +109,42 @@ class GcsInputStream : public arrow::io::InputStream { mutable gcs::ObjectReadStream stream_; }; +class GcsOutputStream : public arrow::io::OutputStream { + public: + explicit GcsOutputStream(gcs::ObjectWriteStream stream) : stream_(std::move(stream)) {} + ~GcsOutputStream() override = default; + + Status Close() override { + stream_.Close(); + return internal::ToArrowStatus(stream_.last_status()); Review comment: Does `last_status` also clear the error status or is it sticky? If it's sticky, then a failed `Write` would also return an error when calling `Close`? ########## File path: cpp/src/arrow/filesystem/gcsfs_internal.cc ########## @@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) { return Status::OK(); } +namespace gcs = ::google::cloud::storage; + +Result<google::cloud::storage::EncryptionKey> ToEncryptionKey( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::EncryptionKey{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "encryptionKeyBase64") { + return gcs::EncryptionKey::FromBase64Key(values[i]); + } + } + return gcs::EncryptionKey{}; +} + +Result<gcs::KmsKeyName> ToKmsKeyName( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::KmsKeyName{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "kmsKeyName") { + return gcs::KmsKeyName(values[i]); + } + } + return gcs::KmsKeyName{}; +} + +Result<gcs::PredefinedAcl> ToPredefinedAcl( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::PredefinedAcl{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "predefinedAcl") { + return gcs::PredefinedAcl(values[i]); + } + } + return gcs::PredefinedAcl{}; +} + +Result<gcs::WithObjectMetadata> ToObjectMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::WithObjectMetadata{}; + } + + static auto const setters = [] { + using setter = std::function<Status(gcs::ObjectMetadata&, const std::string&)>; + return std::map<std::string, setter>{ Review comment: Nit, but why not `std::unordered_map`? ########## File path: cpp/src/arrow/filesystem/gcsfs.cc ########## @@ -145,6 +189,25 @@ class GcsFileSystem::Impl { return std::make_shared<GcsInputStream>(std::move(stream)); } + Result<std::shared_ptr<io::OutputStream>> OpenOutputStream( + const GcsPath& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { + gcs::EncryptionKey encryption_key; + ARROW_ASSIGN_OR_RAISE(encryption_key, internal::ToEncryptionKey(metadata)); + gcs::PredefinedAcl predefined_acl; + ARROW_ASSIGN_OR_RAISE(predefined_acl, internal::ToPredefinedAcl(metadata)); + gcs::KmsKeyName kms_key_name; + ARROW_ASSIGN_OR_RAISE(kms_key_name, internal::ToKmsKeyName(metadata)); + gcs::WithObjectMetadata with_object_metadata; + ARROW_ASSIGN_OR_RAISE(with_object_metadata, internal::ToObjectMetadata(metadata)); + + auto stream = client_.WriteObject(path.bucket, path.object, encryption_key, + predefined_acl, kms_key_name, with_object_metadata); + if (!stream.last_status().ok()) { + return internal::ToArrowStatus(stream.last_status()); + } Review comment: You might want to define a macro that would allow you to write: ```c++ ARROW_GCS_RETURN_NOT_OK(stream.last_status()); ``` ########## File path: cpp/src/arrow/filesystem/gcsfs.cc ########## @@ -102,6 +109,42 @@ class GcsInputStream : public arrow::io::InputStream { mutable gcs::ObjectReadStream stream_; }; +class GcsOutputStream : public arrow::io::OutputStream { + public: + explicit GcsOutputStream(gcs::ObjectWriteStream stream) : stream_(std::move(stream)) {} + ~GcsOutputStream() override = default; + + Status Close() override { + stream_.Close(); + return internal::ToArrowStatus(stream_.last_status()); + } + + Result<int64_t> Tell() const override { + if (!stream_) { + return Status::IOError("invalid stream"); + } + return tell_; + } + + bool closed() const override { return !stream_.IsOpen(); } + + Status Write(const void* data, int64_t nbytes) override { + if (stream_.write(reinterpret_cast<const char*>(data), nbytes)) { + return Status::OK(); + } + return internal::ToArrowStatus(stream_.last_status()); + } + + Status Flush() override { + stream_.flush(); + return Status::OK(); + } + + private: + gcs::ObjectWriteStream stream_; + int64_t tell_ = 0; Review comment: It appears `tell_` is never updated anywhere. Should you do it in `Write` perhaps? ########## File path: cpp/src/arrow/filesystem/gcsfs_internal.cc ########## @@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) { return Status::OK(); } +namespace gcs = ::google::cloud::storage; + +Result<google::cloud::storage::EncryptionKey> ToEncryptionKey( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::EncryptionKey{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "encryptionKeyBase64") { + return gcs::EncryptionKey::FromBase64Key(values[i]); + } + } + return gcs::EncryptionKey{}; +} + +Result<gcs::KmsKeyName> ToKmsKeyName( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::KmsKeyName{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "kmsKeyName") { + return gcs::KmsKeyName(values[i]); + } + } + return gcs::KmsKeyName{}; +} + +Result<gcs::PredefinedAcl> ToPredefinedAcl( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::PredefinedAcl{}; + } + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (std::size_t i = 0; i < keys.size(); ++i) { + if (keys[i] == "predefinedAcl") { + return gcs::PredefinedAcl(values[i]); + } + } + return gcs::PredefinedAcl{}; +} + +Result<gcs::WithObjectMetadata> ToObjectMetadata( + const std::shared_ptr<const KeyValueMetadata>& metadata) { + if (!metadata) { + return gcs::WithObjectMetadata{}; + } + + static auto const setters = [] { + using setter = std::function<Status(gcs::ObjectMetadata&, const std::string&)>; + return std::map<std::string, setter>{ + {"cacheControl", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_cache_control(v); + return Status::OK(); + }}, + {"contentDisposition", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_disposition(v); + return Status::OK(); + }}, + {"contentEncoding", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_encoding(v); + return Status::OK(); + }}, + {"contentLanguage", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_language(v); + return Status::OK(); + }}, + {"contentType", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_content_type(v); + return Status::OK(); + }}, + {"customTime", + [](gcs::ObjectMetadata& m, const std::string& v) { + std::string err; + absl::Time t; + if (!absl::ParseTime(absl::RFC3339_full, v, &t, &err)) { + return Status::Invalid("Error parsing RFC-3339 timestamp: '", v, "': ", err); + } + m.set_custom_time(absl::ToChronoTime(t)); + return Status::OK(); + }}, + {"storageClass", + [](gcs::ObjectMetadata& m, const std::string& v) { + m.set_storage_class(v); + return Status::OK(); + }}, + {"predefinedAcl", + [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); }}, + {"encryptionKeyBase64", + [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); }}, + {"kmsKeyName", + [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); }}, + }; + }(); + + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + gcs::ObjectMetadata object_metadata; + for (std::size_t i = 0; i < keys.size(); ++i) { + auto it = setters.find(keys[i]); + if (it != setters.end()) { + auto status = it->second(object_metadata, values[i]); + if (!status.ok()) return status; + } else { + object_metadata.upsert_metadata(keys[i], values[i]); Review comment: This is meant to allow inserting arbitrary metadata strings? Will GCS balk if the user throws some unrecognized metadata keys here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org