This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 90292ce9d95cd3d78853ef0d86a4dfd6e75024ae
Author: Xuanwo <[email protected]>
AuthorDate: Sat Sep 9 03:12:52 2023 +0800

    Fix azblob
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/azblob/backend.rs | 15 ++++---
 core/src/services/azblob/core.rs    | 15 +------
 core/src/services/azblob/writer.rs  | 84 +++++++++++--------------------------
 3 files changed, 37 insertions(+), 77 deletions(-)

diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index 4b04d5c7e..7e5fe259e 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -38,6 +38,7 @@ use super::pager::AzblobPager;
 use super::writer::AzblobWriter;
 use crate::raw::*;
 use crate::services::azblob::core::AzblobCore;
+use crate::services::azblob::writer::AzblobWriters;
 use crate::types::Metadata;
 use crate::*;
 
@@ -506,7 +507,7 @@ pub struct AzblobBackend {
 impl Accessor for AzblobBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = AzblobWriter;
+    type Writer = AzblobWriters;
     type BlockingWriter = ();
     type Pager = AzblobPager;
     type BlockingPager = ();
@@ -601,10 +602,14 @@ impl Accessor for AzblobBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        Ok((
-            RpWrite::default(),
-            AzblobWriter::new(self.core.clone(), args, path.to_string()),
-        ))
+        let w = AzblobWriter::new(self.core.clone(), args.clone(), 
path.to_string());
+        let w = if args.append() {
+            AzblobWriters::Two(oio::AppendObjectWriter::new(w))
+        } else {
+            AzblobWriters::One(oio::OneShotWriter::new(w))
+        };
+
+        Ok((RpWrite::default(), w))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs
index 6382817bb..5eb46f464 100644
--- a/core/src/services/azblob/core.rs
+++ b/core/src/services/azblob/core.rs
@@ -352,12 +352,6 @@ impl AzblobCore {
     ///
     /// - The maximum size of the content could be appended is 4MB.
     /// - `Append Block` succeeds only if the blob already exists.
-    /// - It does not need to provide append position.
-    /// - But it could use append position to verify the content is appended 
to the right position.
-    ///
-    /// Since the `appendpos` only returned by the append operation response,
-    /// we could not use it when we want to append content to the blob first 
time.
-    /// (The first time of the appender, not the blob)
     ///
     /// # Reference
     ///
@@ -365,8 +359,8 @@ impl AzblobCore {
     pub fn azblob_append_blob_request(
         &self,
         path: &str,
+        position: u64,
         size: u64,
-        position: Option<u64>,
         body: AsyncBody,
     ) -> Result<Request<AsyncBody>> {
         let p = build_abs_path(&self.root, path);
@@ -385,12 +379,7 @@ impl AzblobCore {
 
         req = req.header(CONTENT_LENGTH, size);
 
-        if let Some(pos) = position {
-            req = req.header(
-                
HeaderName::from_static(constants::X_MS_BLOB_CONDITION_APPENDPOS),
-                pos.to_string(),
-            );
-        }
+        req = req.header(constants::X_MS_BLOB_CONDITION_APPENDPOS, position);
 
         let req = req.body(body).map_err(new_request_build_error)?;
 
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index 15a63caca..029278ad5 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -16,9 +16,9 @@
 // under the License.
 
 use std::sync::Arc;
-use std::task::{Context, Poll};
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::AzblobCore;
@@ -27,34 +27,32 @@ use crate::raw::*;
 use crate::*;
 
 const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
-const X_MS_BLOB_APPEND_OFFSET: &str = "x-ms-blob-append-offset";
+
+pub type AzblobWriters =
+    oio::TwoWaysWriter<oio::OneShotWriter<AzblobWriter>, 
oio::AppendObjectWriter<AzblobWriter>>;
 
 pub struct AzblobWriter {
     core: Arc<AzblobCore>,
 
     op: OpWrite,
     path: String,
-
-    position: Option<u64>,
 }
 
 impl AzblobWriter {
     pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self {
-        AzblobWriter {
-            core,
-            op,
-            path,
-            position: None,
-        }
+        AzblobWriter { core, op, path }
     }
+}
 
-    async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
+#[async_trait]
+impl oio::OneShotWrite for AzblobWriter {
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
         let mut req = self.core.azblob_put_blob_request(
             &self.path,
-            Some(size),
+            Some(bs.len() as u64),
             self.op.content_type(),
             self.op.cache_control(),
-            body,
+            AsyncBody::Bytes(bs),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -71,13 +69,11 @@ impl AzblobWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
-    async fn current_position(&mut self) -> Result<Option<u64>> {
-        if let Some(v) = self.position {
-            return Ok(Some(v));
-        }
-
-        // TODO: we should check with current etag to make sure file not 
changed.
+#[async_trait]
+impl oio::AppendObjectWrite for AzblobWriter {
+    async fn offset(&self) -> Result<u64> {
         let resp = self
             .core
             .azblob_get_blob_properties(&self.path, None, None)
@@ -86,9 +82,6 @@ impl AzblobWriter {
         let status = resp.status();
 
         match status {
-            // Just check the blob type.
-            // If it is not an appendable blob, return an error.
-            // We can not get the append position of the blob here.
             StatusCode::OK => {
                 let headers = resp.headers();
                 let blob_type = headers.get(X_MS_BLOB_TYPE).and_then(|v| 
v.to_str().ok());
@@ -98,9 +91,9 @@ impl AzblobWriter {
                         "the blob is not an appendable blob.",
                     ));
                 }
-                Ok(None)
+
+                Ok(parse_content_length(headers)?.unwrap_or_default())
             }
-            // If the blob is not existing, we need to create one.
             StatusCode::NOT_FOUND => {
                 let mut req = self.core.azblob_init_appendable_blob_request(
                     &self.path,
@@ -121,20 +114,16 @@ impl AzblobWriter {
                         return Err(parse_error(resp).await?);
                     }
                 }
-
-                self.position = Some(0);
-                Ok(Some(0))
+                Ok(0)
             }
             _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn append_oneshot(&mut self, size: u64, body: AsyncBody) -> 
Result<()> {
-        let _ = self.current_position().await?;
-
-        let mut req =
-            self.core
-                .azblob_append_blob_request(&self.path, size, self.position, 
body)?;
+    async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> 
Result<()> {
+        let mut req = self
+            .core
+            .azblob_append_blob_request(&self.path, offset, size, body)?;
 
         self.core.sign(&mut req).await?;
 
@@ -143,33 +132,10 @@ impl AzblobWriter {
         let status = resp.status();
         match status {
             StatusCode::CREATED => {
-                let headers = resp.headers();
-                let position = headers
-                    .get(X_MS_BLOB_APPEND_OFFSET)
-                    .and_then(|v| v.to_str().ok())
-                    .and_then(|v| v.parse::<u64>().ok());
-                self.position = position.map(|v| v + size);
-            }
-            _ => {
-                return Err(parse_error(resp).await?);
+                resp.into_body().consume().await?;
+                Ok(())
             }
+            _ => Err(parse_error(resp).await?),
         }
-
-        Ok(())
-    }
-}
-
-#[async_trait]
-impl oio::Write for AzblobWriter {
-    fn poll_write(&mut self, _: &mut Context<'_>, _: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        todo!()
-    }
-
-    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        Poll::Ready(Ok(()))
-    }
-
-    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        Poll::Ready(Ok(()))
     }
 }

Reply via email to