This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new da948564 feat(gcs): add support for gcs append (#1801)
da948564 is described below

commit da9485645bcf2e91fb7fa468082eb3065fcdaa72
Author: Will Li <[email protected]>
AuthorDate: Tue Apr 18 21:28:44 2023 +0800

    feat(gcs): add support for gcs append (#1801)
    
    * add support for gcs append
    
    * convert to stream upload
    
    * update based on comments
---
 core/src/services/gcs/backend.rs | 30 ++++++++++---
 core/src/services/gcs/core.rs    | 77 +++++++++++++++++++++++++++++++-
 core/src/services/gcs/writer.rs  | 96 ++++++++++++++++++++++++++++++++++++----
 3 files changed, 186 insertions(+), 17 deletions(-)

diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 6cfafaad..7e865dcb 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -410,16 +410,32 @@ impl Accessor for GcsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.append() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "append write is not supported",
-            ));
-        }
+        let upload_location = if args.append() {
+            let resp = self.core.gcs_initiate_resumable_upload(path).await?;
+            let status = resp.status();
+
+            match status {
+                StatusCode::OK => {
+                    let bs = parse_location(resp.headers())
+                        .expect("Failed to retrieve location of resumable 
upload");
+                    if let Some(location) = bs {
+                        Some(String::from(location))
+                    } else {
+                        return Err(Error::new(
+                            ErrorKind::NotFound,
+                            "location is not in the response header",
+                        ));
+                    }
+                }
+                _ => return Err(parse_error(resp).await?),
+            }
+        } else {
+            None
+        };
 
         Ok((
             RpWrite::default(),
-            GcsWriter::new(self.core.clone(), args, path.to_string()),
+            GcsWriter::new(self.core.clone(), args, path.to_string(), 
upload_location),
         ))
     }
 
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 780b178d..18376acd 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -21,9 +21,9 @@ use std::fmt::Write;
 
 use backon::ExponentialBuilder;
 use backon::Retryable;
-use bytes::BytesMut;
-use http::header::CONTENT_LENGTH;
+use bytes::{Bytes, BytesMut};
 use http::header::CONTENT_TYPE;
+use http::header::{CONTENT_LENGTH, CONTENT_RANGE};
 use http::Request;
 use http::Response;
 use once_cell::sync::Lazy;
@@ -301,4 +301,77 @@ impl GcsCore {
 
         self.send(req).await
     }
+
+    pub async fn gcs_initiate_resumable_upload(
+        &self,
+        path: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+        let url = format!(
+            "{}/upload/storage/v1/b/{}/o?uploadType=resumable&name={}",
+            self.endpoint, self.bucket, p
+        );
+        let mut req = Request::post(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
+    pub fn gcs_upload_in_resumable_upload(
+        &self,
+        location: &str,
+        size: u64,
+        written_bytes: u64,
+        is_last_part: bool,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let mut req = Request::put(location);
+
+        let range_header = if is_last_part {
+            format!(
+                "bytes {}-{}/{}",
+                written_bytes,
+                written_bytes + size - 1,
+                written_bytes + size
+            )
+        } else {
+            format!("bytes {}-{}/*", written_bytes, written_bytes + size - 1)
+        };
+
+        req = req
+            .header(CONTENT_LENGTH, size)
+            .header(CONTENT_RANGE, range_header);
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        Ok(req)
+    }
+
+    pub async fn gcs_complete_resumable_upload(
+        &self,
+        location: &str,
+        written_bytes: u64,
+        bs: Bytes,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let size = bs.len() as u64;
+        let mut req = Request::post(location)
+            .header(CONTENT_LENGTH, size)
+            .header(
+                CONTENT_RANGE,
+                format!(
+                    "bytes {}-{}/{}",
+                    written_bytes,
+                    written_bytes + size - 1,
+                    written_bytes + size
+                ),
+            )
+            .body(AsyncBody::Bytes(bs))
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
 }
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 8f732a1c..b47b2051 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -32,11 +32,28 @@ pub struct GcsWriter {
 
     op: OpWrite,
     path: String,
+    location: Option<String>,
+    written_bytes: u64,
+    is_last_part_written: bool,
+    last: Option<Bytes>,
 }
 
 impl GcsWriter {
-    pub fn new(core: Arc<GcsCore>, op: OpWrite, path: String) -> Self {
-        GcsWriter { core, op, path }
+    pub fn new(
+        core: Arc<GcsCore>,
+        op: OpWrite,
+        path: String,
+        upload_location: Option<String>,
+    ) -> Self {
+        GcsWriter {
+            core,
+            op,
+            path,
+            location: upload_location,
+            written_bytes: 0,
+            is_last_part_written: false,
+            last: None,
+        }
     }
 }
 
@@ -66,12 +83,47 @@ impl oio::Write for GcsWriter {
     }
 
     async fn append(&mut self, bs: Bytes) -> Result<()> {
-        let _ = bs;
+        let location = if let Some(location) = &self.location {
+            location
+        } else {
+            return Ok(());
+        };
+
+        let result = if let Some(last) = &self.last {
+            let bytes_to_upload = last.slice(0..last.len());
+            let part_size = bytes_to_upload.len() as u64;
+            let is_last_part = part_size % (256 * 1024) != 0;
+            let mut req = self.core.gcs_upload_in_resumable_upload(
+                location,
+                part_size,
+                self.written_bytes,
+                is_last_part,
+                AsyncBody::Bytes(bytes_to_upload),
+            )?;
+
+            self.core.sign(&mut req).await?;
 
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support append",
-        ))
+            let resp = self.core.send(req).await?;
+
+            let status = resp.status();
+
+            match status {
+                StatusCode::OK | StatusCode::PERMANENT_REDIRECT => {
+                    if is_last_part {
+                        self.is_last_part_written = true
+                    } else {
+                        self.written_bytes += part_size;
+                    }
+                    Ok(())
+                }
+                _ => Err(parse_error(resp).await?),
+            }
+        } else {
+            Ok(())
+        };
+
+        self.last = Some(bs.slice(0..bs.len()));
+        return result;
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -79,6 +131,34 @@ impl oio::Write for GcsWriter {
     }
 
     async fn close(&mut self) -> Result<()> {
-        Ok(())
+        if self.is_last_part_written {
+            return Ok(());
+        }
+
+        let location = if let Some(location) = &self.location {
+            location
+        } else {
+            return Ok(());
+        };
+
+        let bs = self
+            .last
+            .as_ref()
+            .expect("failed to get the previously uploaded part");
+
+        let resp = self
+            .core
+            .gcs_complete_resumable_upload(location, self.written_bytes, 
bs.slice(0..bs.len()))
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
     }
 }

Reply via email to