pitrou commented on a change in pull request #11550:
URL: https://github.com/apache/arrow/pull/11550#discussion_r740259034



##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(

Review comment:
       Nit: can write `Result<gcs::EncryptionKey>`

##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::EncryptionKey{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "encryptionKeyBase64") {
+      return gcs::EncryptionKey::FromBase64Key(values[i]);
+    }
+  }
+  return gcs::EncryptionKey{};
+}
+
+Result<gcs::KmsKeyName> ToKmsKeyName(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::KmsKeyName{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "kmsKeyName") {
+      return gcs::KmsKeyName(values[i]);
+    }
+  }
+  return gcs::KmsKeyName{};
+}
+
+Result<gcs::PredefinedAcl> ToPredefinedAcl(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::PredefinedAcl{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "predefinedAcl") {
+      return gcs::PredefinedAcl(values[i]);
+    }
+  }
+  return gcs::PredefinedAcl{};
+}
+
+Result<gcs::WithObjectMetadata> ToObjectMetadata(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::WithObjectMetadata{};
+  }
+
+  static auto const setters = [] {
+    using setter = std::function<Status(gcs::ObjectMetadata&, const 
std::string&)>;
+    return std::map<std::string, setter>{
+        {"cacheControl",

Review comment:
       Is there a particular reason for using this casing? The S3 
implementation uses "Cache-Control", "Content-Type", etc., so I'd rather keep 
the same exact naming rather than reinvent another.

##########
File path: cpp/src/arrow/filesystem/gcsfs_test.cc
##########
@@ -259,6 +352,55 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
   EXPECT_EQ(result.status().code(), StatusCode::IOError);
 }
 
+TEST_F(GcsIntegrationTest, WriteObjectSmall) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  const auto path = kPreexistingBucket + std::string("/test-write-object");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  const auto expected = std::string(kLoremIpsum);
+  ASSERT_OK(output->Write(expected.data(), expected.size()));
+  ASSERT_OK(output->Close());
+
+  // Verify we can read the object back.
+  std::shared_ptr<io::InputStream> input;
+  ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path));
+
+  std::array<char, 1024> inbuf{};
+  std::int64_t size;
+  ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data()));
+
+  EXPECT_EQ(std::string(inbuf.data(), size), expected);
+}
+
+TEST_F(GcsIntegrationTest, WriteObjectLarge) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  const auto path = kPreexistingBucket + std::string("/test-write-object");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  const auto b0 = std::string(512 * 1024, 'A');
+  const auto b1 = std::string(768 * 1024, 'B');
+  const auto b2 = std::string(1024 * 1024, 'C');

Review comment:
       Perhaps choose those string sizes so that they are not multiples of the 
underlying buffer size?

##########
File path: cpp/src/arrow/filesystem/gcsfs_test.cc
##########
@@ -259,6 +352,55 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
   EXPECT_EQ(result.status().code(), StatusCode::IOError);
 }
 
+TEST_F(GcsIntegrationTest, WriteObjectSmall) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  const auto path = kPreexistingBucket + std::string("/test-write-object");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  const auto expected = std::string(kLoremIpsum);
+  ASSERT_OK(output->Write(expected.data(), expected.size()));
+  ASSERT_OK(output->Close());
+
+  // Verify we can read the object back.
+  std::shared_ptr<io::InputStream> input;
+  ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path));
+
+  std::array<char, 1024> inbuf{};
+  std::int64_t size;
+  ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data()));
+
+  EXPECT_EQ(std::string(inbuf.data(), size), expected);
+}
+
+TEST_F(GcsIntegrationTest, WriteObjectLarge) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  const auto path = kPreexistingBucket + std::string("/test-write-object");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  const auto b0 = std::string(512 * 1024, 'A');
+  const auto b1 = std::string(768 * 1024, 'B');
+  const auto b2 = std::string(1024 * 1024, 'C');
+  ASSERT_OK(output->Write(b0.data(), b0.size()));
+  ASSERT_OK(output->Write(b1.data(), b1.size()));
+  ASSERT_OK(output->Write(b2.data(), b2.size()));
+  ASSERT_OK(output->Close());
+
+  // Verify we can read the object back.
+  std::shared_ptr<io::InputStream> input;
+  ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path));
+
+  std::string contents;
+  std::shared_ptr<Buffer> buffer;
+  do {
+    ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
+    contents.append(buffer->ToString());
+  } while (buffer && buffer->size() != 0);

Review comment:
       Nit: `buffer` should never be null in this context. If it is, it's a bug 
in `Read`.

##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::EncryptionKey{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "encryptionKeyBase64") {
+      return gcs::EncryptionKey::FromBase64Key(values[i]);
+    }
+  }
+  return gcs::EncryptionKey{};
+}
+
+Result<gcs::KmsKeyName> ToKmsKeyName(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::KmsKeyName{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "kmsKeyName") {
+      return gcs::KmsKeyName(values[i]);
+    }
+  }
+  return gcs::KmsKeyName{};
+}
+
+Result<gcs::PredefinedAcl> ToPredefinedAcl(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::PredefinedAcl{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "predefinedAcl") {
+      return gcs::PredefinedAcl(values[i]);
+    }
+  }
+  return gcs::PredefinedAcl{};
+}
+
+Result<gcs::WithObjectMetadata> ToObjectMetadata(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::WithObjectMetadata{};
+  }
+
+  static auto const setters = [] {
+    using setter = std::function<Status(gcs::ObjectMetadata&, const 
std::string&)>;
+    return std::map<std::string, setter>{
+        {"cacheControl",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_cache_control(v);
+           return Status::OK();
+         }},
+        {"contentDisposition",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_disposition(v);
+           return Status::OK();
+         }},
+        {"contentEncoding",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_encoding(v);
+           return Status::OK();
+         }},
+        {"contentLanguage",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_language(v);
+           return Status::OK();
+         }},
+        {"contentType",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_type(v);
+           return Status::OK();
+         }},
+        {"customTime",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           std::string err;
+           absl::Time t;
+           if (!absl::ParseTime(absl::RFC3339_full, v, &t, &err)) {

Review comment:
       Is absl already an include-time dependency of GCS?

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -102,6 +109,42 @@ class GcsInputStream : public arrow::io::InputStream {
   mutable gcs::ObjectReadStream stream_;
 };
 
+class GcsOutputStream : public arrow::io::OutputStream {
+ public:
+  explicit GcsOutputStream(gcs::ObjectWriteStream stream) : 
stream_(std::move(stream)) {}
+  ~GcsOutputStream() override = default;
+
+  Status Close() override {
+    stream_.Close();
+    return internal::ToArrowStatus(stream_.last_status());

Review comment:
       Does `last_status` also clear the error status or is it sticky? If it's 
sticky, then a failed `Write` would also return an error when calling `Close`?

##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::EncryptionKey{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "encryptionKeyBase64") {
+      return gcs::EncryptionKey::FromBase64Key(values[i]);
+    }
+  }
+  return gcs::EncryptionKey{};
+}
+
+Result<gcs::KmsKeyName> ToKmsKeyName(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::KmsKeyName{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "kmsKeyName") {
+      return gcs::KmsKeyName(values[i]);
+    }
+  }
+  return gcs::KmsKeyName{};
+}
+
+Result<gcs::PredefinedAcl> ToPredefinedAcl(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::PredefinedAcl{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "predefinedAcl") {
+      return gcs::PredefinedAcl(values[i]);
+    }
+  }
+  return gcs::PredefinedAcl{};
+}
+
+Result<gcs::WithObjectMetadata> ToObjectMetadata(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::WithObjectMetadata{};
+  }
+
+  static auto const setters = [] {
+    using setter = std::function<Status(gcs::ObjectMetadata&, const 
std::string&)>;
+    return std::map<std::string, setter>{

Review comment:
       Nit, but why not `std::unordered_map`?

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -145,6 +189,25 @@ class GcsFileSystem::Impl {
     return std::make_shared<GcsInputStream>(std::move(stream));
   }
 
+  Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
+      const GcsPath& path, const std::shared_ptr<const KeyValueMetadata>& 
metadata) {
+    gcs::EncryptionKey encryption_key;
+    ARROW_ASSIGN_OR_RAISE(encryption_key, internal::ToEncryptionKey(metadata));
+    gcs::PredefinedAcl predefined_acl;
+    ARROW_ASSIGN_OR_RAISE(predefined_acl, internal::ToPredefinedAcl(metadata));
+    gcs::KmsKeyName kms_key_name;
+    ARROW_ASSIGN_OR_RAISE(kms_key_name, internal::ToKmsKeyName(metadata));
+    gcs::WithObjectMetadata with_object_metadata;
+    ARROW_ASSIGN_OR_RAISE(with_object_metadata, 
internal::ToObjectMetadata(metadata));
+
+    auto stream = client_.WriteObject(path.bucket, path.object, encryption_key,
+                                      predefined_acl, kms_key_name, 
with_object_metadata);
+    if (!stream.last_status().ok()) {
+      return internal::ToArrowStatus(stream.last_status());
+    }

Review comment:
       You might want to define a macro that would allow you to write:
   ```c++
   ARROW_GCS_RETURN_NOT_OK(stream.last_status());
   ```

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -102,6 +109,42 @@ class GcsInputStream : public arrow::io::InputStream {
   mutable gcs::ObjectReadStream stream_;
 };
 
+class GcsOutputStream : public arrow::io::OutputStream {
+ public:
+  explicit GcsOutputStream(gcs::ObjectWriteStream stream) : 
stream_(std::move(stream)) {}
+  ~GcsOutputStream() override = default;
+
+  Status Close() override {
+    stream_.Close();
+    return internal::ToArrowStatus(stream_.last_status());
+  }
+
+  Result<int64_t> Tell() const override {
+    if (!stream_) {
+      return Status::IOError("invalid stream");
+    }
+    return tell_;
+  }
+
+  bool closed() const override { return !stream_.IsOpen(); }
+
+  Status Write(const void* data, int64_t nbytes) override {
+    if (stream_.write(reinterpret_cast<const char*>(data), nbytes)) {
+      return Status::OK();
+    }
+    return internal::ToArrowStatus(stream_.last_status());
+  }
+
+  Status Flush() override {
+    stream_.flush();
+    return Status::OK();
+  }
+
+ private:
+  gcs::ObjectWriteStream stream_;
+  int64_t tell_ = 0;

Review comment:
       It appears `tell_` is never updated anywhere. Should you do it in 
`Write` perhaps?

##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::EncryptionKey{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "encryptionKeyBase64") {
+      return gcs::EncryptionKey::FromBase64Key(values[i]);
+    }
+  }
+  return gcs::EncryptionKey{};
+}
+
+Result<gcs::KmsKeyName> ToKmsKeyName(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::KmsKeyName{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "kmsKeyName") {
+      return gcs::KmsKeyName(values[i]);
+    }
+  }
+  return gcs::KmsKeyName{};
+}
+
+Result<gcs::PredefinedAcl> ToPredefinedAcl(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::PredefinedAcl{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "predefinedAcl") {
+      return gcs::PredefinedAcl(values[i]);
+    }
+  }
+  return gcs::PredefinedAcl{};
+}
+
+Result<gcs::WithObjectMetadata> ToObjectMetadata(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::WithObjectMetadata{};
+  }
+
+  static auto const setters = [] {
+    using setter = std::function<Status(gcs::ObjectMetadata&, const 
std::string&)>;
+    return std::map<std::string, setter>{
+        {"cacheControl",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_cache_control(v);
+           return Status::OK();
+         }},
+        {"contentDisposition",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_disposition(v);
+           return Status::OK();
+         }},
+        {"contentEncoding",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_encoding(v);
+           return Status::OK();
+         }},
+        {"contentLanguage",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_language(v);
+           return Status::OK();
+         }},
+        {"contentType",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_type(v);
+           return Status::OK();
+         }},
+        {"customTime",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           std::string err;
+           absl::Time t;
+           if (!absl::ParseTime(absl::RFC3339_full, v, &t, &err)) {
+             return Status::Invalid("Error parsing RFC-3339 timestamp: '", v, 
"': ", err);
+           }
+           m.set_custom_time(absl::ToChronoTime(t));
+           return Status::OK();
+         }},
+        {"storageClass",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_storage_class(v);
+           return Status::OK();
+         }},
+        {"predefinedAcl",
+         [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); 
}},
+        {"encryptionKeyBase64",
+         [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); 
}},
+        {"kmsKeyName",
+         [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); 
}},
+    };
+  }();
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  gcs::ObjectMetadata object_metadata;
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    auto it = setters.find(keys[i]);
+    if (it != setters.end()) {
+      auto status = it->second(object_metadata, values[i]);
+      if (!status.ok()) return status;
+    } else {
+      object_metadata.upsert_metadata(keys[i], values[i]);

Review comment:
       This is meant to allow inserting arbitrary metadata strings? Will GCS 
balk if the user throws some unrecognized metadata keys here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to