This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit ab919a94442be0e5e87e3fc5a4fbf7782669e1a3
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 18:01:44 2023 +0800

    Migrate azblob
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/azblob/backend.rs |  30 ++++++--
 core/src/services/azblob/writer.rs  | 138 +++++++++++++-----------------------
 2 files changed, 75 insertions(+), 93 deletions(-)

diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index fbd480de6..3a437b6c8 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -38,6 +38,7 @@ use super::pager::AzblobPager;
 use super::writer::AzblobWriter;
 use crate::raw::*;
 use crate::services::azblob::core::AzblobCore;
+use crate::services::azblob::writer::AzblobWriters;
 use crate::types::Metadata;
 use crate::*;
 
@@ -52,6 +53,7 @@ const KNOWN_AZBLOB_ENDPOINT_SUFFIX: &[&str] = &[
 ];
 
 const AZBLOB_BATCH_LIMIT: usize = 256;
+
 /// Azure Storage Blob services support.
 #[doc = include_str!("docs.md")]
 #[derive(Default, Clone)]
@@ -506,7 +508,7 @@ pub struct AzblobBackend {
 impl Accessor for AzblobBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = AzblobWriter;
+    type Writer = oio::TwoWaysWriter<AzblobWriters, 
oio::AtLeastBufWriter<AzblobWriters>>;
     type BlockingWriter = ();
     type Pager = AzblobPager;
     type BlockingPager = ();
@@ -602,10 +604,28 @@ impl Accessor for AzblobBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        Ok((
-            RpWrite::default(),
-            AzblobWriter::new(self.core.clone(), args, path.to_string()),
-        ))
+        let writer = AzblobWriter::new(self.core.clone(), path, args.clone());
+
+        let w = if args.content_length().is_some() {
+            AzblobWriters::One(oio::OneShotWriter::new(writer))
+        } else if args.append() {
+            AzblobWriters::Two(oio::AppendObjectWriter::new(writer))
+        } else {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "azblob write without neither content-length nor append is not 
supported yet",
+            ));
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            oio::TwoWaysWriter::Two(
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length()),
+            )
+        } else {
+            oio::TwoWaysWriter::One(w)
+        };
+
+        Ok((RpWrite::default(), w))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index c2519294a..7e121cd05 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -22,39 +22,42 @@ use http::StatusCode;
 
 use super::core::AzblobCore;
 use super::error::parse_error;
-use crate::raw::oio::Stream;
+use crate::raw::oio::{Stream, Streamer};
 use crate::raw::*;
 use crate::*;
 
 const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
 const X_MS_BLOB_APPEND_OFFSET: &str = "x-ms-blob-append-offset";
 
+pub type AzblobWriters =
+    oio::TwoWaysWriter<oio::OneShotWriter<AzblobWriter>, 
oio::AppendObjectWriter<AzblobWriter>>;
+
 pub struct AzblobWriter {
     core: Arc<AzblobCore>,
 
-    op: OpWrite,
     path: String,
-
-    position: Option<u64>,
+    op: OpWrite,
 }
 
 impl AzblobWriter {
-    pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self {
+    pub fn new(core: Arc<AzblobCore>, path: &str, op: OpWrite) -> Self {
         AzblobWriter {
             core,
+            path: path.to_string(),
             op,
-            path,
-            position: None,
         }
     }
+}
 
-    async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
+#[async_trait]
+impl oio::OneShotWrite for AzblobWriter {
+    async fn write_once(&self, stream: Streamer) -> Result<()> {
         let mut req = self.core.azblob_put_blob_request(
             &self.path,
-            Some(size),
+            Some(stream.size()),
             self.op.content_type(),
             self.op.cache_control(),
-            body,
+            AsyncBody::Stream(stream),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -71,13 +74,11 @@ impl AzblobWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
-    async fn current_position(&mut self) -> Result<Option<u64>> {
-        if let Some(v) = self.position {
-            return Ok(Some(v));
-        }
-
-        // TODO: we should check with current etag to make sure file not 
changed.
+#[async_trait]
+impl oio::AppendObjectWrite for AzblobWriter {
+    async fn offset(&self) -> Result<u64> {
         let resp = self
             .core
             .azblob_get_blob_properties(&self.path, None, None)
@@ -86,9 +87,6 @@ impl AzblobWriter {
         let status = resp.status();
 
         match status {
-            // Just check the blob type.
-            // If it is not an appendable blob, return an error.
-            // We can not get the append position of the blob here.
             StatusCode::OK => {
                 let headers = resp.headers();
                 let blob_type = headers.get(X_MS_BLOB_TYPE).and_then(|v| 
v.to_str().ok());
@@ -98,43 +96,44 @@ impl AzblobWriter {
                         "the blob is not an appendable blob.",
                     ));
                 }
-                Ok(None)
-            }
-            // If the blob is not existing, we need to create one.
-            StatusCode::NOT_FOUND => {
-                let mut req = self.core.azblob_init_appendable_blob_request(
-                    &self.path,
-                    self.op.content_type(),
-                    self.op.cache_control(),
-                )?;
-
-                self.core.sign(&mut req).await?;
-
-                let resp = self.core.client.send(req).await?;
-
-                let status = resp.status();
-                match status {
-                    StatusCode::CREATED => {
-                        // do nothing
-                    }
-                    _ => {
-                        return Err(parse_error(resp).await?);
-                    }
-                }
 
-                self.position = Some(0);
-                Ok(Some(0))
+                parse_content_length(headers).map(|v| v.unwrap_or_default())
             }
+            // Return 0 if the blob is not existing.
+            StatusCode::NOT_FOUND => Ok(0),
             _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn append_oneshot(&mut self, size: u64, body: AsyncBody) -> 
Result<()> {
-        let _ = self.current_position().await?;
+    async fn append(&self, offset: u64, stream: Streamer) -> Result<()> {
+        // Init appendable blob if we are writing to a file with offset 0.
+        if offset == 0 {
+            let mut req = self.core.azblob_init_appendable_blob_request(
+                &self.path,
+                self.op.content_type(),
+                self.op.cache_control(),
+            )?;
+
+            self.core.sign(&mut req).await?;
+
+            let resp = self.core.client.send(req).await?;
+            let status = resp.status();
+            match status {
+                StatusCode::CREATED => {
+                    // do nothing
+                }
+                _ => {
+                    return Err(parse_error(resp).await?);
+                }
+            }
+        }
 
-        let mut req =
-            self.core
-                .azblob_append_blob_request(&self.path, size, self.position, 
body)?;
+        let mut req = self.core.azblob_append_blob_request(
+            &self.path,
+            stream.size(),
+            Some(offset),
+            AsyncBody::Stream(stream),
+        )?;
 
         self.core.sign(&mut req).await?;
 
@@ -142,45 +141,8 @@ impl AzblobWriter {
 
         let status = resp.status();
         match status {
-            StatusCode::CREATED => {
-                let headers = resp.headers();
-                let position = headers
-                    .get(X_MS_BLOB_APPEND_OFFSET)
-                    .and_then(|v| v.to_str().ok())
-                    .and_then(|v| v.parse::<u64>().ok());
-                self.position = position.map(|v| v + size);
-            }
-            _ => {
-                return Err(parse_error(resp).await?);
-            }
-        }
-
-        Ok(())
-    }
-}
-
-#[async_trait]
-impl oio::Write for AzblobWriter {
-    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
-        if self.op.append() {
-            self.append_oneshot(s.size(), AsyncBody::Stream(s)).await
-        } else {
-            if self.op.content_length().is_none() {
-                return Err(Error::new(
-                    ErrorKind::Unsupported,
-                    "write without content length is not supported",
-                ));
-            }
-
-            self.write_oneshot(s.size(), AsyncBody::Stream(s)).await
+            StatusCode::CREATED => Ok(()),
+            _ => Err(parse_error(resp).await?),
         }
     }
-
-    async fn abort(&mut self) -> Result<()> {
-        Ok(())
-    }
-
-    async fn close(&mut self) -> Result<()> {
-        Ok(())
-    }
 }

Reply via email to