This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new da948564 feat(gcs): add support for gcs append (#1801)
da948564 is described below
commit da9485645bcf2e91fb7fa468082eb3065fcdaa72
Author: Will Li <[email protected]>
AuthorDate: Tue Apr 18 21:28:44 2023 +0800
feat(gcs): add support for gcs append (#1801)
* add support for gcs append
* convert to stream upload
* update based on comments
---
core/src/services/gcs/backend.rs | 30 ++++++++++---
core/src/services/gcs/core.rs | 77 +++++++++++++++++++++++++++++++-
core/src/services/gcs/writer.rs | 96 ++++++++++++++++++++++++++++++++++++----
3 files changed, 186 insertions(+), 17 deletions(-)
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 6cfafaad..7e865dcb 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -410,16 +410,32 @@ impl Accessor for GcsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- if args.append() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "append write is not supported",
- ));
- }
+ let upload_location = if args.append() {
+ let resp = self.core.gcs_initiate_resumable_upload(path).await?;
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let bs = parse_location(resp.headers())
+ .expect("Failed to retrieve location of resumable
upload");
+ if let Some(location) = bs {
+ Some(String::from(location))
+ } else {
+ return Err(Error::new(
+ ErrorKind::NotFound,
+ "location is not in the response header",
+ ));
+ }
+ }
+ _ => return Err(parse_error(resp).await?),
+ }
+ } else {
+ None
+ };
Ok((
RpWrite::default(),
- GcsWriter::new(self.core.clone(), args, path.to_string()),
+ GcsWriter::new(self.core.clone(), args, path.to_string(),
upload_location),
))
}
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 780b178d..18376acd 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -21,9 +21,9 @@ use std::fmt::Write;
use backon::ExponentialBuilder;
use backon::Retryable;
-use bytes::BytesMut;
-use http::header::CONTENT_LENGTH;
+use bytes::{Bytes, BytesMut};
use http::header::CONTENT_TYPE;
+use http::header::{CONTENT_LENGTH, CONTENT_RANGE};
use http::Request;
use http::Response;
use once_cell::sync::Lazy;
@@ -301,4 +301,77 @@ impl GcsCore {
self.send(req).await
}
+
+ pub async fn gcs_initiate_resumable_upload(
+ &self,
+ path: &str,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+ let url = format!(
+ "{}/upload/storage/v1/b/{}/o?uploadType=resumable&name={}",
+ self.endpoint, self.bucket, p
+ );
+ let mut req = Request::post(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
+ pub fn gcs_upload_in_resumable_upload(
+ &self,
+ location: &str,
+ size: u64,
+ written_bytes: u64,
+ is_last_part: bool,
+ body: AsyncBody,
+ ) -> Result<Request<AsyncBody>> {
+ let mut req = Request::put(location);
+
+ let range_header = if is_last_part {
+ format!(
+ "bytes {}-{}/{}",
+ written_bytes,
+ written_bytes + size - 1,
+ written_bytes + size
+ )
+ } else {
+ format!("bytes {}-{}/*", written_bytes, written_bytes + size - 1)
+ };
+
+ req = req
+ .header(CONTENT_LENGTH, size)
+ .header(CONTENT_RANGE, range_header);
+
+ // Set body
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ Ok(req)
+ }
+
+ pub async fn gcs_complete_resumable_upload(
+ &self,
+ location: &str,
+ written_bytes: u64,
+ bs: Bytes,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let size = bs.len() as u64;
+ let mut req = Request::post(location)
+ .header(CONTENT_LENGTH, size)
+ .header(
+ CONTENT_RANGE,
+ format!(
+ "bytes {}-{}/{}",
+ written_bytes,
+ written_bytes + size - 1,
+ written_bytes + size
+ ),
+ )
+ .body(AsyncBody::Bytes(bs))
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
}
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 8f732a1c..b47b2051 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -32,11 +32,28 @@ pub struct GcsWriter {
op: OpWrite,
path: String,
+ location: Option<String>,
+ written_bytes: u64,
+ is_last_part_written: bool,
+ last: Option<Bytes>,
}
impl GcsWriter {
- pub fn new(core: Arc<GcsCore>, op: OpWrite, path: String) -> Self {
- GcsWriter { core, op, path }
+ pub fn new(
+ core: Arc<GcsCore>,
+ op: OpWrite,
+ path: String,
+ upload_location: Option<String>,
+ ) -> Self {
+ GcsWriter {
+ core,
+ op,
+ path,
+ location: upload_location,
+ written_bytes: 0,
+ is_last_part_written: false,
+ last: None,
+ }
}
}
@@ -66,12 +83,47 @@ impl oio::Write for GcsWriter {
}
async fn append(&mut self, bs: Bytes) -> Result<()> {
- let _ = bs;
+ let location = if let Some(location) = &self.location {
+ location
+ } else {
+ return Ok(());
+ };
+
+ let result = if let Some(last) = &self.last {
+ let bytes_to_upload = last.slice(0..last.len());
+ let part_size = bytes_to_upload.len() as u64;
+ let is_last_part = part_size % (256 * 1024) != 0;
+ let mut req = self.core.gcs_upload_in_resumable_upload(
+ location,
+ part_size,
+ self.written_bytes,
+ is_last_part,
+ AsyncBody::Bytes(bytes_to_upload),
+ )?;
+
+ self.core.sign(&mut req).await?;
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support append",
- ))
+ let resp = self.core.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK | StatusCode::PERMANENT_REDIRECT => {
+ if is_last_part {
+ self.is_last_part_written = true
+ } else {
+ self.written_bytes += part_size;
+ }
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ } else {
+ Ok(())
+ };
+
+ self.last = Some(bs.slice(0..bs.len()));
+ return result;
}
async fn abort(&mut self) -> Result<()> {
@@ -79,6 +131,34 @@ impl oio::Write for GcsWriter {
}
async fn close(&mut self) -> Result<()> {
- Ok(())
+ if self.is_last_part_written {
+ return Ok(());
+ }
+
+ let location = if let Some(location) = &self.location {
+ location
+ } else {
+ return Ok(());
+ };
+
+ let bs = self
+ .last
+ .as_ref()
+ .expect("failed to get the previously uploaded part");
+
+ let resp = self
+ .core
+ .gcs_complete_resumable_upload(location, self.written_bytes,
bs.slice(0..bs.len()))
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
}
}