This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch buffer-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit c6a5d8ef7d2ad361c5c74a9469cbdbf916689241
Author: Xuanwo <[email protected]>
AuthorDate: Tue Aug 22 15:49:37 2023 +0800

    Refactor
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/cursor.rs                       |  22 ++++
 core/src/raw/oio/write/at_least_buf_write.rs     |  11 ++
 core/src/raw/oio/write/mod.rs                    |   5 +
 core/src/raw/oio/write/multipart_upload_write.rs | 139 ++++-------------------
 core/src/raw/oio/write/one_shot_write.rs         |  69 +++++++++++
 core/src/services/cos/backend.rs                 |   3 +-
 core/src/services/cos/writer.rs                  |  10 +-
 core/src/services/obs/backend.rs                 |   3 +-
 core/src/services/obs/writer.rs                  |  12 +-
 core/src/services/oss/backend.rs                 |   3 +-
 core/src/services/oss/writer.rs                  |  10 +-
 core/src/services/s3/writer.rs                   |  15 +--
 12 files changed, 160 insertions(+), 142 deletions(-)

diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 1080f78e9..7ef537025 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -45,6 +45,11 @@ impl Cursor {
         let len = self.pos.min(self.inner.len() as u64) as usize;
         &self.inner.as_ref()[len..]
     }
+
+    /// Return the length of remaining slice.
+    pub fn len(&self) -> usize {
+        self.inner.len() - self.pos as usize
+    }
 }
 
 impl From<Bytes> for Cursor {
@@ -148,6 +153,23 @@ impl oio::BlockingRead for Cursor {
     }
 }
 
+impl oio::Stream for Cursor {
+    fn poll_next(&mut self, _: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if self.is_empty() {
+            return Poll::Ready(None);
+        }
+
+        let bs = self.inner.clone();
+        self.pos += bs.len() as u64;
+        Poll::Ready(Some(Ok(bs)))
+    }
+
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        self.pos = 0;
+        Poll::Ready(Ok(()))
+    }
+}
+
 /// # TODO
 ///
 /// we can do some compaction during runtime. For example, merge 4K data
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
index 98a237970..91da42bbc 100644
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -31,6 +31,17 @@ pub struct AtLeastBufWriter<W: oio::Write> {
     buf: oio::ChunkedCursor,
 }
 
+impl<W: oio::Write> AtLeastBufWriter<W> {
+    /// Create a new at least buf writer.
+    pub fn new(inner: W, size: usize) -> Self {
+        Self {
+            inner,
+            size,
+            buf: oio::ChunkedCursor::new(),
+        }
+    }
+}
+
 #[async_trait]
 impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs
index fa49ea6e2..7994e2078 100644
--- a/core/src/raw/oio/write/mod.rs
+++ b/core/src/raw/oio/write/mod.rs
@@ -35,3 +35,8 @@ pub use append_object_write::AppendObjectWrite;
 pub use append_object_write::AppendObjectWriter;
 
 mod at_least_buf_write;
+pub use at_least_buf_write::AtLeastBufWriter;
+
+mod one_shot_write;
+pub use one_shot_write::OneShotWrite;
+pub use one_shot_write::OneShotWriter;
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 8ca10c462..c013b24c3 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -21,8 +21,6 @@ use bytes::Bytes;
 use crate::raw::*;
 use crate::*;
 
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
-
 /// MultipartUploadWrite is used to implement [`Write`] based on multipart
 /// uploads. By implementing MultipartUploadWrite, services don't need to
 /// care about the details of buffering and uploading parts.
@@ -34,12 +32,6 @@ const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
 /// - Expose `MultipartUploadWriter` as `Accessor::Writer`
 #[async_trait]
 pub trait MultipartUploadWrite: Send + Sync + Unpin {
-    /// write_once write all data at once.
-    ///
-    /// MultipartUploadWriter will call this API when the size of data is
-    /// already known.
-    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>;
-
     /// initiate_part will call start a multipart upload and return the upload 
id.
     ///
     /// MultipartUploadWriter will call this when:
@@ -94,39 +86,32 @@ pub struct MultipartUploadPart {
 /// - Allow users to switch to un-buffered mode if users write 16MiB every 
time.
 pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
     inner: W,
-    total_size: Option<u64>,
 
     upload_id: Option<String>,
     parts: Vec<MultipartUploadPart>,
-    buffer: oio::VectorCursor,
-    buffer_size: usize,
 }
 
 impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {
     /// Create a new MultipartUploadWriter.
-    pub fn new(inner: W, total_size: Option<u64>) -> Self {
+    pub fn new(inner: W) -> Self {
         Self {
             inner,
-            total_size,
 
             upload_id: None,
             parts: Vec::new(),
-            buffer: oio::VectorCursor::new(),
-            buffer_size: DEFAULT_WRITE_MIN_SIZE,
         }
     }
 
-    /// Configure the write_min_size.
-    ///
-    /// write_min_size is used to control the size of internal buffer.
-    ///
-    /// MultipartUploadWriter will flush the buffer to upload a part when
-    /// the size of buffer is larger than write_min_size.
-    ///
-    /// This value is default to 8 MiB (as recommended by AWS).
-    pub fn with_write_min_size(mut self, v: usize) -> Self {
-        self.buffer_size = v;
-        self
+    /// Get the upload id. Initiate a new multipart upload if the upload id is 
empty.
+    pub async fn upload_id(&mut self) -> Result<String> {
+        match &self.upload_id {
+            Some(upload_id) => Ok(upload_id.to_string()),
+            None => {
+                let upload_id = self.inner.initiate_part().await?;
+                self.upload_id = Some(upload_id.clone());
+                Ok(upload_id)
+            }
+        }
     }
 }
 
@@ -136,88 +121,28 @@ where
     W: MultipartUploadWrite,
 {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let upload_id = match &self.upload_id {
-            Some(upload_id) => upload_id,
-            None => {
-                if self.total_size.unwrap_or_default() == bs.len() as u64 {
-                    return self
-                        .inner
-                        .write_once(bs.len() as u64, AsyncBody::Bytes(bs))
-                        .await;
-                }
-
-                let upload_id = self.inner.initiate_part().await?;
-                self.upload_id = Some(upload_id);
-                self.upload_id.as_deref().unwrap()
-            }
-        };
+        let upload_id = self.upload_id().await?;
 
-        // Ignore empty bytes
-        if bs.is_empty() {
-            return Ok(());
-        }
-
-        self.buffer.push(bs);
-        // Return directly if the buffer is not full
-        if self.buffer.len() <= self.buffer_size {
-            return Ok(());
-        }
-
-        let bs = self.buffer.peak_at_least(self.buffer_size);
         let size = bs.len();
 
-        match self
-            .inner
+        self.inner
             .write_part(
-                upload_id,
+                &upload_id,
                 self.parts.len(),
                 size as u64,
                 AsyncBody::Bytes(bs),
             )
             .await
-        {
-            Ok(part) => {
-                self.buffer.take(size);
-                self.parts.push(part);
-                Ok(())
-            }
-            Err(e) => {
-                // If the upload fails, we should pop the given bs to make sure
-                // write is re-enter safe.
-                self.buffer.pop();
-                Err(e)
-            }
-        }
+            .map(|v| self.parts.push(v))
     }
 
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        if !self.buffer.is_empty() {
-            return Err(Error::new(
-                ErrorKind::InvalidInput,
-                "Writer::sink should not be used mixed with existing buffer",
-            ));
-        }
-
-        let upload_id = match &self.upload_id {
-            Some(upload_id) => upload_id,
-            None => {
-                if self.total_size.unwrap_or_default() == size {
-                    return self.inner.write_once(size, 
AsyncBody::Stream(s)).await;
-                }
-
-                let upload_id = self.inner.initiate_part().await?;
-                self.upload_id = Some(upload_id);
-                self.upload_id.as_deref().unwrap()
-            }
-        };
+        let upload_id = self.upload_id().await?;
 
-        let part = self
-            .inner
-            .write_part(upload_id, self.parts.len(), size, 
AsyncBody::Stream(s))
-            .await?;
-        self.parts.push(part);
-
-        Ok(())
+        self.inner
+            .write_part(&upload_id, self.parts.len(), size, 
AsyncBody::Stream(s))
+            .await
+            .map(|v| self.parts.push(v))
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -227,30 +152,6 @@ where
             return Ok(());
         };
 
-        // Make sure internal buffer has been flushed.
-        if !self.buffer.is_empty() {
-            let bs = self.buffer.peak_exact(self.buffer.len());
-
-            match self
-                .inner
-                .write_part(
-                    upload_id,
-                    self.parts.len(),
-                    bs.len() as u64,
-                    AsyncBody::Bytes(bs),
-                )
-                .await
-            {
-                Ok(part) => {
-                    self.buffer.clear();
-                    self.parts.push(part);
-                }
-                Err(e) => {
-                    return Err(e);
-                }
-            }
-        }
-
         self.inner.complete_part(upload_id, &self.parts).await
     }
 
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
new file mode 100644
index 000000000..c2ad25c83
--- /dev/null
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use async_trait::async_trait;
+use bytes::Bytes;
+
+/// OneShotWrite is used to implement [`Write`] based on one shot operation.
+/// By implementing OneShotWrite, services don't need to care about the 
details.
+///
+/// For example, S3 `PUT Object` and fs `write_all`.
+///
+/// The layout after adopting [`OneShotWrite`]:
+#[async_trait]
+pub trait OneShotWrite: Send + Sync + Unpin {
+    /// write_once write all data at once.
+    ///
+    /// Implementations should make sure that the data is written correctly at 
once.
+    async fn write_once(&self, size: u64, stream: oio::Streamer) -> Result<()>;
+}
+
+/// OneShotWrite is used to implement [`Write`] based on one shot.
+pub struct OneShotWriter<W: OneShotWrite> {
+    inner: W,
+}
+
+impl<W: OneShotWrite> OneShotWriter<W> {
+    /// Create a new one shot writer.
+    pub fn new(inner: W) -> Self {
+        Self { inner }
+    }
+}
+
+#[async_trait]
+impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let cursor = oio::Cursor::from(bs);
+        self.inner
+            .write_once(cursor.len() as u64, Box::new(cursor))
+            .await
+    }
+
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        self.inner.write_once(size, s).await
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 9dc4b1b66..62a968ea0 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -364,8 +364,7 @@ impl Accessor for CosBackend {
 
             oio::TwoWaysWriter::Right(w)
         } else {
-            let w = oio::MultipartUploadWriter::new(writer, 
args.content_length())
-                .with_write_min_size(self.core.write_min_size);
+            let w = oio::MultipartUploadWriter::new(writer);
 
             oio::TwoWaysWriter::Left(w)
         };
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index e64bc23a1..cdcbb9b18 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -23,6 +23,7 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -44,15 +45,15 @@ impl CosWriter {
 }
 
 #[async_trait]
-impl oio::MultipartUploadWrite for CosWriter {
-    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for CosWriter {
+    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
         let mut req = self.core.cos_put_object_request(
             &self.path,
             Some(size),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            body,
+            AsyncBody::Stream(stream),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -69,7 +70,10 @@ impl oio::MultipartUploadWrite for CosWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::MultipartUploadWrite for CosWriter {
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index e90308eda..ce2e83057 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -458,8 +458,7 @@ impl Accessor for ObsBackend {
 
             oio::TwoWaysWriter::Right(w)
         } else {
-            let w = oio::MultipartUploadWriter::new(writer, 
args.content_length())
-                .with_write_min_size(self.core.write_min_size);
+            let w = oio::MultipartUploadWriter::new(writer);
 
             oio::TwoWaysWriter::Left(w)
         };
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index efcfd3ab6..893a098d7 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -23,7 +23,7 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::MultipartUploadPart;
+use crate::raw::oio::{MultipartUploadPart, Streamer};
 use crate::raw::*;
 use crate::*;
 
@@ -43,15 +43,16 @@ impl ObsWriter {
         }
     }
 }
+
 #[async_trait]
-impl oio::MultipartUploadWrite for ObsWriter {
-    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for ObsWriter {
+    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
         let mut req = self.core.obs_put_object_request(
             &self.path,
             Some(size),
             self.op.content_type(),
             self.op.cache_control(),
-            body,
+            AsyncBody::Stream(stream),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -68,7 +69,10 @@ impl oio::MultipartUploadWrite for ObsWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::MultipartUploadWrite for ObsWriter {
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 6704d158b..1b6d1f33e 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -486,8 +486,7 @@ impl Accessor for OssBackend {
 
             oio::TwoWaysWriter::Right(w)
         } else {
-            let w = oio::MultipartUploadWriter::new(writer, 
args.content_length())
-                .with_write_min_size(self.core.write_min_size);
+            let w = oio::MultipartUploadWriter::new(writer);
 
             oio::TwoWaysWriter::Left(w)
         };
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 9faad249f..ff9c8f9d2 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -23,6 +23,7 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -44,15 +45,15 @@ impl OssWriter {
 }
 
 #[async_trait]
-impl oio::MultipartUploadWrite for OssWriter {
-    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for OssWriter {
+    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
         let mut req = self.core.oss_put_object_request(
             &self.path,
             Some(size),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            body,
+            AsyncBody::Stream(stream),
             false,
         )?;
 
@@ -70,7 +71,10 @@ impl oio::MultipartUploadWrite for OssWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::MultipartUploadWrite for OssWriter {
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index 508716d14..c323dd526 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -23,6 +23,7 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
@@ -35,29 +36,26 @@ pub struct S3Writer {
 
 impl S3Writer {
     pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> 
oio::MultipartUploadWriter<Self> {
-        let write_min_size = core.write_min_size;
-
-        let total_size = op.content_length();
         let s3_writer = S3Writer {
             core,
             path: path.to_string(),
             op,
         };
 
-        oio::MultipartUploadWriter::new(s3_writer, 
total_size).with_write_min_size(write_min_size)
+        oio::MultipartUploadWriter::new(s3_writer)
     }
 }
 
 #[async_trait]
-impl oio::MultipartUploadWrite for S3Writer {
-    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for S3Writer {
+    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
         let mut req = self.core.s3_put_object_request(
             &self.path,
             Some(size),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            body,
+            AsyncBody::Stream(stream),
         )?;
 
         self.core.sign(&mut req).await?;
@@ -74,7 +72,10 @@ impl oio::MultipartUploadWrite for S3Writer {
             _ => Err(parse_error(resp).await?),
         }
     }
+}
 
+#[async_trait]
+impl oio::MultipartUploadWrite for S3Writer {
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core

Reply via email to