coryan commented on a change in pull request #11550:
URL: https://github.com/apache/arrow/pull/11550#discussion_r743104899



##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -102,6 +109,42 @@ class GcsInputStream : public arrow::io::InputStream {
   mutable gcs::ObjectReadStream stream_;
 };
 
+class GcsOutputStream : public arrow::io::OutputStream {
+ public:
+  explicit GcsOutputStream(gcs::ObjectWriteStream stream) : 
stream_(std::move(stream)) {}
+  ~GcsOutputStream() override = default;
+
+  Status Close() override {
+    stream_.Close();
+    return internal::ToArrowStatus(stream_.last_status());
+  }
+
+  Result<int64_t> Tell() const override {
+    if (!stream_) {
+      return Status::IOError("invalid stream");
+    }
+    return tell_;
+  }
+
+  bool closed() const override { return !stream_.IsOpen(); }
+
+  Status Write(const void* data, int64_t nbytes) override {
+    if (stream_.write(reinterpret_cast<const char*>(data), nbytes)) {
+      return Status::OK();
+    }
+    return internal::ToArrowStatus(stream_.last_status());
+  }
+
+  Status Flush() override {
+    stream_.flush();
+    return Status::OK();
+  }
+
+ private:
+  gcs::ObjectWriteStream stream_;
+  int64_t tell_ = 0;

Review comment:
       Fixed. Thanks.

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -102,6 +109,42 @@ class GcsInputStream : public arrow::io::InputStream {
   mutable gcs::ObjectReadStream stream_;
 };
 
+class GcsOutputStream : public arrow::io::OutputStream {
+ public:
+  explicit GcsOutputStream(gcs::ObjectWriteStream stream) : 
stream_(std::move(stream)) {}
+  ~GcsOutputStream() override = default;
+
+  Status Close() override {
+    stream_.Close();
+    return internal::ToArrowStatus(stream_.last_status());

Review comment:
       It is sticky.  And yes, a failed `Write()` will make subsequent 
`Close()` fail.  It is unadvisable to finalize a stream that failed, you don't 
know what is its state.
   

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -145,6 +189,25 @@ class GcsFileSystem::Impl {
     return std::make_shared<GcsInputStream>(std::move(stream));
   }
 
+  Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
+      const GcsPath& path, const std::shared_ptr<const KeyValueMetadata>& 
metadata) {
+    gcs::EncryptionKey encryption_key;
+    ARROW_ASSIGN_OR_RAISE(encryption_key, internal::ToEncryptionKey(metadata));
+    gcs::PredefinedAcl predefined_acl;
+    ARROW_ASSIGN_OR_RAISE(predefined_acl, internal::ToPredefinedAcl(metadata));
+    gcs::KmsKeyName kms_key_name;
+    ARROW_ASSIGN_OR_RAISE(kms_key_name, internal::ToKmsKeyName(metadata));
+    gcs::WithObjectMetadata with_object_metadata;
+    ARROW_ASSIGN_OR_RAISE(with_object_metadata, 
internal::ToObjectMetadata(metadata));
+
+    auto stream = client_.WriteObject(path.bucket, path.object, encryption_key,
+                                      predefined_acl, kms_key_name, 
with_object_metadata);
+    if (!stream.last_status().ok()) {
+      return internal::ToArrowStatus(stream.last_status());
+    }

Review comment:
       Done.

##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(

Review comment:
       Done.

##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::EncryptionKey{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "encryptionKeyBase64") {
+      return gcs::EncryptionKey::FromBase64Key(values[i]);
+    }
+  }
+  return gcs::EncryptionKey{};
+}
+
+Result<gcs::KmsKeyName> ToKmsKeyName(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::KmsKeyName{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "kmsKeyName") {
+      return gcs::KmsKeyName(values[i]);
+    }
+  }
+  return gcs::KmsKeyName{};
+}
+
+Result<gcs::PredefinedAcl> ToPredefinedAcl(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::PredefinedAcl{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "predefinedAcl") {
+      return gcs::PredefinedAcl(values[i]);
+    }
+  }
+  return gcs::PredefinedAcl{};
+}
+
+Result<gcs::WithObjectMetadata> ToObjectMetadata(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::WithObjectMetadata{};
+  }
+
+  static auto const setters = [] {
+    using setter = std::function<Status(gcs::ObjectMetadata&, const 
std::string&)>;
+    return std::map<std::string, setter>{
+        {"cacheControl",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_cache_control(v);
+           return Status::OK();
+         }},
+        {"contentDisposition",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_disposition(v);
+           return Status::OK();
+         }},
+        {"contentEncoding",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_encoding(v);
+           return Status::OK();
+         }},
+        {"contentLanguage",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_language(v);
+           return Status::OK();
+         }},
+        {"contentType",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_type(v);
+           return Status::OK();
+         }},
+        {"customTime",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           std::string err;
+           absl::Time t;
+           if (!absl::ParseTime(absl::RFC3339_full, v, &t, &err)) {

Review comment:
       Yes.  Some Abseil types are exposed in the public API for 
`google-cloud-cpp`.

##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::EncryptionKey{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "encryptionKeyBase64") {
+      return gcs::EncryptionKey::FromBase64Key(values[i]);
+    }
+  }
+  return gcs::EncryptionKey{};
+}
+
+Result<gcs::KmsKeyName> ToKmsKeyName(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::KmsKeyName{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "kmsKeyName") {
+      return gcs::KmsKeyName(values[i]);
+    }
+  }
+  return gcs::KmsKeyName{};
+}
+
+Result<gcs::PredefinedAcl> ToPredefinedAcl(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::PredefinedAcl{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "predefinedAcl") {
+      return gcs::PredefinedAcl(values[i]);
+    }
+  }
+  return gcs::PredefinedAcl{};
+}
+
+Result<gcs::WithObjectMetadata> ToObjectMetadata(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::WithObjectMetadata{};
+  }
+
+  static auto const setters = [] {
+    using setter = std::function<Status(gcs::ObjectMetadata&, const 
std::string&)>;
+    return std::map<std::string, setter>{
+        {"cacheControl",

Review comment:
       These are the names of the fields in the GCS metadata:
   
   https://cloud.google.com/storage/docs/json_api/v1/objects
   
   It seems odd to make them match the protocol headers over HTTP (and the S3 
API at that), :shrug:
   
   I have changed them, I can change them back.

##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::EncryptionKey{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "encryptionKeyBase64") {
+      return gcs::EncryptionKey::FromBase64Key(values[i]);
+    }
+  }
+  return gcs::EncryptionKey{};
+}
+
+Result<gcs::KmsKeyName> ToKmsKeyName(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::KmsKeyName{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "kmsKeyName") {
+      return gcs::KmsKeyName(values[i]);
+    }
+  }
+  return gcs::KmsKeyName{};
+}
+
+Result<gcs::PredefinedAcl> ToPredefinedAcl(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::PredefinedAcl{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "predefinedAcl") {
+      return gcs::PredefinedAcl(values[i]);
+    }
+  }
+  return gcs::PredefinedAcl{};
+}
+
+Result<gcs::WithObjectMetadata> ToObjectMetadata(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::WithObjectMetadata{};
+  }
+
+  static auto const setters = [] {
+    using setter = std::function<Status(gcs::ObjectMetadata&, const 
std::string&)>;
+    return std::map<std::string, setter>{

Review comment:
       Sure, done.

##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::EncryptionKey{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "encryptionKeyBase64") {
+      return gcs::EncryptionKey::FromBase64Key(values[i]);
+    }
+  }
+  return gcs::EncryptionKey{};
+}
+
+Result<gcs::KmsKeyName> ToKmsKeyName(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::KmsKeyName{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "kmsKeyName") {
+      return gcs::KmsKeyName(values[i]);
+    }
+  }
+  return gcs::KmsKeyName{};
+}
+
+Result<gcs::PredefinedAcl> ToPredefinedAcl(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::PredefinedAcl{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "predefinedAcl") {
+      return gcs::PredefinedAcl(values[i]);
+    }
+  }
+  return gcs::PredefinedAcl{};
+}
+
+Result<gcs::WithObjectMetadata> ToObjectMetadata(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::WithObjectMetadata{};
+  }
+
+  static auto const setters = [] {
+    using setter = std::function<Status(gcs::ObjectMetadata&, const 
std::string&)>;
+    return std::map<std::string, setter>{
+        {"cacheControl",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_cache_control(v);
+           return Status::OK();
+         }},
+        {"contentDisposition",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_disposition(v);
+           return Status::OK();
+         }},
+        {"contentEncoding",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_encoding(v);
+           return Status::OK();
+         }},
+        {"contentLanguage",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_language(v);
+           return Status::OK();
+         }},
+        {"contentType",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_content_type(v);
+           return Status::OK();
+         }},
+        {"customTime",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           std::string err;
+           absl::Time t;
+           if (!absl::ParseTime(absl::RFC3339_full, v, &t, &err)) {
+             return Status::Invalid("Error parsing RFC-3339 timestamp: '", v, 
"': ", err);
+           }
+           m.set_custom_time(absl::ToChronoTime(t));
+           return Status::OK();
+         }},
+        {"storageClass",
+         [](gcs::ObjectMetadata& m, const std::string& v) {
+           m.set_storage_class(v);
+           return Status::OK();
+         }},
+        {"predefinedAcl",
+         [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); 
}},
+        {"encryptionKeyBase64",
+         [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); 
}},
+        {"kmsKeyName",
+         [](gcs::ObjectMetadata&, const std::string&) { return Status::OK(); 
}},
+    };
+  }();
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  gcs::ObjectMetadata object_metadata;
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    auto it = setters.find(keys[i]);
+    if (it != setters.end()) {
+      auto status = it->second(object_metadata, values[i]);
+      if (!status.ok()) return status;
+    } else {
+      object_metadata.upsert_metadata(keys[i], values[i]);

Review comment:
       GCS accepts (mostly) arbitrary metadata keys:
   
   https://cloud.google.com/storage/docs/metadata#custom-metadata

##########
File path: cpp/src/arrow/filesystem/gcsfs_test.cc
##########
@@ -259,6 +352,55 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
   EXPECT_EQ(result.status().code(), StatusCode::IOError);
 }
 
+TEST_F(GcsIntegrationTest, WriteObjectSmall) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  const auto path = kPreexistingBucket + std::string("/test-write-object");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  const auto expected = std::string(kLoremIpsum);
+  ASSERT_OK(output->Write(expected.data(), expected.size()));
+  ASSERT_OK(output->Close());
+
+  // Verify we can read the object back.
+  std::shared_ptr<io::InputStream> input;
+  ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path));
+
+  std::array<char, 1024> inbuf{};
+  std::int64_t size;
+  ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data()));
+
+  EXPECT_EQ(std::string(inbuf.data(), size), expected);
+}
+
+TEST_F(GcsIntegrationTest, WriteObjectLarge) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  const auto path = kPreexistingBucket + std::string("/test-write-object");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  const auto b0 = std::string(512 * 1024, 'A');
+  const auto b1 = std::string(768 * 1024, 'B');
+  const auto b2 = std::string(1024 * 1024, 'C');

Review comment:
       Done.

##########
File path: cpp/src/arrow/filesystem/gcsfs_test.cc
##########
@@ -259,6 +352,55 @@ TEST_F(GcsIntegrationTest, ReadObjectInfoInvalid) {
   EXPECT_EQ(result.status().code(), StatusCode::IOError);
 }
 
+TEST_F(GcsIntegrationTest, WriteObjectSmall) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  const auto path = kPreexistingBucket + std::string("/test-write-object");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  const auto expected = std::string(kLoremIpsum);
+  ASSERT_OK(output->Write(expected.data(), expected.size()));
+  ASSERT_OK(output->Close());
+
+  // Verify we can read the object back.
+  std::shared_ptr<io::InputStream> input;
+  ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path));
+
+  std::array<char, 1024> inbuf{};
+  std::int64_t size;
+  ASSERT_OK_AND_ASSIGN(size, input->Read(inbuf.size(), inbuf.data()));
+
+  EXPECT_EQ(std::string(inbuf.data(), size), expected);
+}
+
+TEST_F(GcsIntegrationTest, WriteObjectLarge) {
+  auto fs = internal::MakeGcsFileSystemForTest(TestGcsOptions());
+
+  const auto path = kPreexistingBucket + std::string("/test-write-object");
+  std::shared_ptr<io::OutputStream> output;
+  ASSERT_OK_AND_ASSIGN(output, fs->OpenOutputStream(path, {}));
+  const auto b0 = std::string(512 * 1024, 'A');
+  const auto b1 = std::string(768 * 1024, 'B');
+  const auto b2 = std::string(1024 * 1024, 'C');
+  ASSERT_OK(output->Write(b0.data(), b0.size()));
+  ASSERT_OK(output->Write(b1.data(), b1.size()));
+  ASSERT_OK(output->Write(b2.data(), b2.size()));
+  ASSERT_OK(output->Close());
+
+  // Verify we can read the object back.
+  std::shared_ptr<io::InputStream> input;
+  ASSERT_OK_AND_ASSIGN(input, fs->OpenInputStream(path));
+
+  std::string contents;
+  std::shared_ptr<Buffer> buffer;
+  do {
+    ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
+    contents.append(buffer->ToString());
+  } while (buffer && buffer->size() != 0);

Review comment:
       Fixed.

##########
File path: cpp/src/arrow/filesystem/gcsfs_internal.cc
##########
@@ -62,6 +65,133 @@ Status ToArrowStatus(const google::cloud::Status& s) {
   return Status::OK();
 }
 
+namespace gcs = ::google::cloud::storage;
+
+Result<google::cloud::storage::EncryptionKey> ToEncryptionKey(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::EncryptionKey{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "encryptionKeyBase64") {
+      return gcs::EncryptionKey::FromBase64Key(values[i]);
+    }
+  }
+  return gcs::EncryptionKey{};
+}
+
+Result<gcs::KmsKeyName> ToKmsKeyName(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::KmsKeyName{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "kmsKeyName") {
+      return gcs::KmsKeyName(values[i]);
+    }
+  }
+  return gcs::KmsKeyName{};
+}
+
+Result<gcs::PredefinedAcl> ToPredefinedAcl(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::PredefinedAcl{};
+  }
+
+  const auto& keys = metadata->keys();
+  const auto& values = metadata->values();
+
+  for (std::size_t i = 0; i < keys.size(); ++i) {
+    if (keys[i] == "predefinedAcl") {
+      return gcs::PredefinedAcl(values[i]);
+    }
+  }
+  return gcs::PredefinedAcl{};
+}
+
+Result<gcs::WithObjectMetadata> ToObjectMetadata(
+    const std::shared_ptr<const KeyValueMetadata>& metadata) {
+  if (!metadata) {
+    return gcs::WithObjectMetadata{};
+  }
+
+  static auto const setters = [] {
+    using setter = std::function<Status(gcs::ObjectMetadata&, const 
std::string&)>;
+    return std::map<std::string, setter>{
+        {"cacheControl",

Review comment:
       SGTM.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to