This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 571570a5429a491c01f515c0cf324ff7d9354c52 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 17:35:39 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/logging.rs | 2 +- core/src/layers/timeout.rs | 2 +- core/src/raw/http_util/multipart.rs | 10 +++++----- core/src/raw/oio/stream/api.rs | 4 ++-- core/src/raw/oio/write/at_least_buf_write.rs | 1 - 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index b1fc1ade4..5aa9ffaa7 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1252,7 +1252,7 @@ impl<W> LoggingWriter<W> { #[async_trait] impl<W: oio::Write> oio::Write for LoggingWriter<W> { - async fn write(&mut self, mut s: oio::Streamer) -> Result<()> { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { let size = s.size(); match self.inner.write(s).await { Ok(_) => { diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 802940ecf..679e05f2b 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -322,7 +322,7 @@ impl<R: oio::Read> oio::Read for TimeoutWrapper<R> { #[async_trait] impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { - async fn write(&mut self, mut s: oio::Streamer) -> Result<()> { + async fn write(&mut self, s: oio::Streamer) -> Result<()> { let timeout = self.io_timeout(s.size()); tokio::time::timeout(timeout, self.inner.write(s)) diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index 77b0709f0..309ae3daa 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -125,7 +125,7 @@ impl<T: Part> Multipart<T> { let mut parts = VecDeque::new(); // Write headers. for v in self.parts.into_iter() { - let mut stream = v.format(); + let stream = v.format(); total_size += stream.size(); parts.push_back(stream); } @@ -154,7 +154,7 @@ impl<T: Part> Multipart<T> { /// This function will make sure content_type and content_length set correctly. pub fn apply(self, mut builder: http::request::Builder) -> Result<Request<AsyncBody>> { let boundary = self.boundary.clone(); - let mut stream = self.build(); + let stream = self.build(); // Insert content type with correct boundary. builder = builder.header( @@ -755,7 +755,7 @@ mod tests { .part(FormDataPart::new("foo").content(Bytes::from("bar"))) .part(FormDataPart::new("hello").content(Bytes::from("world"))); - let mut body = multipart.build(); + let body = multipart.build(); let size = body.size(); let bs = body.collect().await.unwrap(); assert_eq!(size, bs.len() as u64); @@ -917,7 +917,7 @@ Upload to Amazon S3 .content(r#"{"metadata": {"type": "calico"}}"#), ); - let mut body = multipart.build(); + let body = multipart.build(); let size = body.size(); let bs = body.collect().await?; assert_eq!(size, bs.len() as u64); @@ -1024,7 +1024,7 @@ content-length: 32 .header("content-length".parse().unwrap(), "0".parse().unwrap()), ); - let mut body = multipart.build(); + let body = multipart.build(); let size = body.size(); let bs = body.collect().await?; assert_eq!(size, bs.len() as u64); diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs index 79946d847..e59abf92f 100644 --- a/core/src/raw/oio/stream/api.rs +++ b/core/src/raw/oio/stream/api.rs @@ -83,7 +83,7 @@ impl<T: Stream + ?Sized> Stream for Box<T> { impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> { fn size(&self) -> u64 { match self.try_lock() { - Ok(mut this) => this.size(), + Ok(this) => this.size(), Err(_) => panic!("the stream is expected to have only one consumer, but it's not"), } } @@ -112,7 +112,7 @@ impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> { impl<T: Stream + ?Sized> Stream for Arc<tokio::sync::Mutex<T>> { fn size(&self) -> u64 { match self.try_lock() { - Ok(mut this) => this.size(), + Ok(this) => this.size(), Err(_) => panic!("the stream is expected to have only one consumer, but it's not"), } } diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 87c5a523f..1a0a2f5c3 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -78,7 +78,6 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { } let buf = self.buffer.clone(); - let buffer_size = buf.len() as u64; let stream = buf.chain(s); self.inner
