This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-writer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit ff42dfae99cdea8218970a3db85e4a6f3cfe9189 Author: Xuanwo <[email protected]> AuthorDate: Mon Sep 4 15:55:16 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/utils.rs | 3 +- core/src/layers/complete.rs | 2 +- core/src/layers/concurrent_limit.rs | 2 +- core/src/layers/error_context.rs | 2 +- core/src/layers/logging.rs | 2 +- core/src/layers/madsim.rs | 2 +- core/src/layers/metrics.rs | 2 +- core/src/layers/minitrace.rs | 2 +- core/src/layers/oteltrace.rs | 2 +- core/src/layers/prometheus.rs | 2 +- core/src/layers/retry.rs | 14 +-- core/src/layers/throttle.rs | 3 +- core/src/layers/timeout.rs | 2 +- core/src/layers/tracing.rs | 2 +- core/src/raw/adapters/kv/backend.rs | 2 +- core/src/raw/adapters/typed_kv/backend.rs | 2 +- core/src/raw/oio/cursor.rs | 14 +++ core/src/raw/oio/read/cloneable_read.rs | 137 +++++++++++++++++++++++ core/src/raw/oio/read/into_read_from_file.rs | 2 +- core/src/raw/oio/read/into_read_from_stream.rs | 83 ++++++++++++++ core/src/raw/oio/read/mod.rs | 10 ++ core/src/raw/oio/stream/api.rs | 12 ++ core/src/raw/oio/write/api.rs | 6 +- core/src/raw/oio/write/append_object_write.rs | 5 +- core/src/raw/oio/write/at_least_buf_write.rs | 32 +----- core/src/raw/oio/write/compose_write.rs | 5 +- core/src/raw/oio/write/exact_buf_write.rs | 55 ++------- core/src/raw/oio/write/multipart_upload_write.rs | 9 +- core/src/raw/oio/write/one_shot_write.rs | 4 +- core/src/services/azblob/writer.rs | 8 +- core/src/services/azdfs/writer.rs | 2 +- core/src/services/dropbox/writer.rs | 2 +- core/src/services/fs/writer.rs | 2 +- core/src/services/ftp/writer.rs | 2 +- core/src/services/gcs/writer.rs | 5 +- core/src/services/gdrive/writer.rs | 2 +- core/src/services/ghac/writer.rs | 2 +- core/src/services/hdfs/writer.rs | 2 +- core/src/services/ipmfs/writer.rs | 2 +- core/src/services/onedrive/writer.rs | 2 +- core/src/services/sftp/writer.rs | 2 +- core/src/services/supabase/writer.rs | 2 +- core/src/services/vercel_artifacts/writer.rs | 2 +- core/src/services/wasabi/writer.rs | 2 +- core/src/services/webdav/writer.rs | 5 +- core/src/services/webhdfs/writer.rs | 2 +- core/src/types/writer.rs | 10 +- 47 files changed, 334 insertions(+), 142 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index 88e764c87..53aa665ca 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -18,7 +18,6 @@ use async_trait::async_trait; use bytes::Bytes; use opendal::raw::oio; -use opendal::raw::oio::Streamer; use rand::prelude::ThreadRng; use rand::RngCore; @@ -31,7 +30,7 @@ impl oio::Write for BlackHoleWriter { Ok(bs.len() as u64) } - async fn pipe(&mut self, size: u64, _: Streamer) -> opendal::Result<u64> { + async fn pipe(&mut self, size: u64, _: oio::Reader) -> opendal::Result<u64> { Ok(size) } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 21efc47f8..6ebd5c23c 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -734,7 +734,7 @@ where Ok(n as u64) } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { if let Some(total_size) = self.size { if self.written + size > total_size { return Err(Error::new( diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 7384547f6..c1504e6a2 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -293,7 +293,7 @@ impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> { self.inner.abort().await } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { self.inner.pipe(size, s).await } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 536b7c956..33cebe92c 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -419,7 +419,7 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> { }) } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { self.inner.pipe(size, s).await.map_err(|err| { err.with_operation(WriteOperation::Pipe) .with_context("service", self.scheme) diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index 66708ec57..5ed80a28b 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1285,7 +1285,7 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> { } } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { match self.inner.pipe(size, s).await { Ok(n) => { self.written += n; diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 7dc193cb7..55741389d 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -318,7 +318,7 @@ impl oio::Write for MadsimWriter { } } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> crate::Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> crate::Result<u64> { Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index 2439f0221..a96e83f8a 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -861,7 +861,7 @@ impl<R: oio::Write> oio::Write for MetricWrapper<R> { }) } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { self.inner .pipe(size, s) .await diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 65f73ecd0..f6487aa98 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -347,7 +347,7 @@ impl<R: oio::Write> oio::Write for MinitraceWrapper<R> { .await } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { self.inner .pipe(size, s) .in_span(Span::enter_with_parent( diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index a439d79b3..9bd464be5 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -317,7 +317,7 @@ impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> { self.inner.write(bs).await } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { self.inner.pipe(size, s).await } diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 4fec394de..70aae731e 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -679,7 +679,7 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { }) } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { self.inner .pipe(size, s) .await diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index 536860cde..d5c8a0738 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -34,7 +34,6 @@ use backon::Retryable; use bytes::Bytes; use futures::FutureExt; use log::warn; -use tokio::sync::Mutex; use crate::raw::oio::PageOperation; use crate::raw::oio::ReadOperation; @@ -919,8 +918,8 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { /// The overhead is constant, which means the overhead will not increase with the size of /// stream. For example, if every `next` call cost 1ms, then the overhead will only take 0.005% /// which is acceptable. - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { - let s = Arc::new(Mutex::new(s)); + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { + let s = oio::into_cloneable_reader_within_tokio(s); let mut backoff = self.builder.build(); @@ -932,13 +931,14 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { None => return Err(e), Some(dur) => { { - use oio::StreamExt; + use oio::ReadExt; + let s = s.clone().into_inner(); let mut stream = s.lock().await; - // Try to reset this stream. + // Try to reset this reader. // - // If error happened, we will return the sink error directly and stop retry. - if stream.reset().await.is_err() { + // If error happened, we will return the pipe error directly and stop retry. + if stream.seek(io::SeekFrom::Start(0)).await.is_err() { return Err(e); } } diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs index aea598c3a..d821aacbf 100644 --- a/core/src/layers/throttle.rs +++ b/core/src/layers/throttle.rs @@ -33,7 +33,6 @@ use governor::NegativeMultiDecision; use governor::Quota; use governor::RateLimiter; -use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -242,7 +241,7 @@ impl<R: oio::Write> oio::Write for ThrottleWrapper<R> { } } - async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { self.inner.pipe(size, s).await } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 421202023..642b8a49b 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -335,7 +335,7 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { })? } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { let timeout = self.io_timeout(size); tokio::time::timeout(timeout, self.inner.pipe(size, s)) diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 002fe314f..467d0a153 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -332,7 +332,7 @@ impl<R: oio::Write> oio::Write for TracingWrapper<R> { parent = &self.span, level = "trace", skip_all)] - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { self.inner.pipe(size, s).await } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index f3406b8ae..ce9ad3005 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -397,7 +397,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> { Ok(size as u64) } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index aeaab9864..494118e84 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -410,7 +410,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> { Ok(size as u64) } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index 796b2a349..88a713068 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -343,6 +343,20 @@ impl oio::Stream for ChunkedCursor { } } +impl oio::Read for ChunkedCursor { + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { + todo!() + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { + todo!() + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + todo!() + } +} + /// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Stream`] pub struct VectorCursor { inner: VecDeque<Bytes>, diff --git a/core/src/raw/oio/read/cloneable_read.rs b/core/src/raw/oio/read/cloneable_read.rs new file mode 100644 index 000000000..2ccd50042 --- /dev/null +++ b/core/src/raw/oio/read/cloneable_read.rs @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::*; +use crate::*; +use bytes::Bytes; +use std::io::SeekFrom; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// Convert given reader into a wrapper with `std::sync::Mutex` for `Send + Sync + Clone`. +pub fn into_cloneable_reader_within_std<R>(reader: R) -> CloneableReaderWithinStd<R> { + CloneableReaderWithinStd(Arc::new(std::sync::Mutex::new(reader))) +} + +/// CloneableReaderWithinStd is a Send + Sync + Clone with `std::sync::Mutex` wrapper of input +/// reader. +/// +/// Caller can clone this reader but only one thread can calling `oio::Read` API at the +/// same time, otherwise, we will return error if lock block happened. +pub struct CloneableReaderWithinStd<R>(Arc<std::sync::Mutex<R>>); + +impl<R> CloneableReaderWithinStd<R> { + /// Consume self to get inner reader. + pub fn into_inner(self) -> Arc<std::sync::Mutex<R>> { + self.0 + } +} + +impl<R> Clone for CloneableReaderWithinStd<R> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl<R: oio::Read> oio::Read for CloneableReaderWithinStd<R> { + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { + match self.0.try_lock() { + Ok(mut this) => this.poll_read(cx, buf), + Err(_) => Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "the cloneable reader is expected to have only one owner, but it's not", + ))), + } + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { + match self.0.try_lock() { + Ok(mut this) => this.poll_seek(cx, pos), + Err(_) => Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "the cloneable reader is expected to have only one owner, but it's not", + ))), + } + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + match self.0.try_lock() { + Ok(mut this) => this.poll_next(cx), + Err(_) => Poll::Ready(Some(Err(Error::new( + ErrorKind::Unexpected, + "the cloneable reader is expected to have only one owner, but it's not", + )))), + } + } +} + +/// Convert given reader into a wrapper with `tokio::sync::Mutex` for `Send + Sync + Clone`. +pub fn into_cloneable_reader_within_tokio<R>(reader: R) -> CloneableReaderWithinTokio<R> { + CloneableReaderWithinTokio(Arc::new(tokio::sync::Mutex::new(reader))) +} + +/// CloneableReaderWithinTokio is a Send + Sync + Clone with `tokio::sync::Mutex` wrapper of input +/// reader. +/// +/// Caller can clone this reader but only one thread can calling `oio::Read` API at the +/// same time, otherwise, we will return error if lock block happened. +pub struct CloneableReaderWithinTokio<R>(Arc<tokio::sync::Mutex<R>>); + +impl<R> CloneableReaderWithinTokio<R> { + /// Consume self to get inner reader. + pub fn into_inner(self) -> Arc<tokio::sync::Mutex<R>> { + self.0 + } +} + +impl<R> Clone for CloneableReaderWithinTokio<R> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl<R: oio::Read> oio::Read for CloneableReaderWithinTokio<R> { + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { + match self.0.try_lock() { + Ok(mut this) => this.poll_read(cx, buf), + Err(_) => Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "the cloneable reader is expected to have only one owner, but it's not", + ))), + } + } + + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { + match self.0.try_lock() { + Ok(mut this) => this.poll_seek(cx, pos), + Err(_) => Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "the cloneable reader is expected to have only one owner, but it's not", + ))), + } + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + match self.0.try_lock() { + Ok(mut this) => this.poll_next(cx), + Err(_) => Poll::Ready(Some(Err(Error::new( + ErrorKind::Unexpected, + "the cloneable reader is expected to have only one owner, but it's not", + )))), + } + } +} diff --git a/core/src/raw/oio/read/into_read_from_file.rs b/core/src/raw/oio/read/into_read_from_file.rs index 96e80f515..6cc28f2d8 100644 --- a/core/src/raw/oio/read/into_read_from_file.rs +++ b/core/src/raw/oio/read/into_read_from_file.rs @@ -41,7 +41,7 @@ pub fn into_read_from_file<R>(fd: R, start: u64, end: u64) -> FromFileReader<R> } } -/// FdReader is a wrapper of input fd to implement [`oio::Read`]. +/// FromFileReader is a wrapper of input fd to implement [`oio::Read`]. pub struct FromFileReader<R> { inner: R, diff --git a/core/src/raw/oio/read/into_read_from_stream.rs b/core/src/raw/oio/read/into_read_from_stream.rs new file mode 100644 index 000000000..0a231de2b --- /dev/null +++ b/core/src/raw/oio/read/into_read_from_stream.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::*; +use crate::*; +use bytes::{Buf, Bytes}; +use futures::StreamExt; +use std::io::SeekFrom; +use std::task::{Context, Poll}; + +/// Convert given stream `futures::Stream<Item = Result<Bytes>>` into [`oio::Reader`]. +pub fn into_read_from_stream<S>(stream: S) -> FromStreamReader<S> { + FromStreamReader { + inner: stream, + buf: Bytes::new(), + } +} + +/// FromStreamReader will convert a `futures::Stream<Item = Result<Bytes>>` into `oio::Read` +pub struct FromStreamReader<S> { + inner: S, + buf: Bytes, +} + +impl<S, T> oio::Read for FromStreamReader<S> +where + S: futures::Stream<Item = Result<T>> + Send + Sync + Unpin + 'static, + T: Into<Bytes>, +{ + fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { + if !self.buf.is_empty() { + let len = std::cmp::min(buf.len(), self.buf.len()); + buf[..len].copy_from_slice(&self.buf[..len]); + self.buf.advance(len); + return Poll::Ready(Ok(len)); + } + + match futures::ready!(self.inner.poll_next_unpin(cx)) { + Some(Ok(bytes)) => { + let bytes = bytes.into(); + let len = std::cmp::min(buf.len(), bytes.len()); + buf[..len].copy_from_slice(&bytes[..len]); + self.buf = bytes.slice(len..); + Poll::Ready(Ok(len)) + } + Some(Err(err)) => Poll::Ready(Err(err)), + None => Poll::Ready(Ok(0)), + } + } + + fn poll_seek(&mut self, _: &mut Context<'_>, _: SeekFrom) -> Poll<Result<u64>> { + Poll::Ready(Err(Error::new( + ErrorKind::Unsupported, + "FromStreamReader can't support operation", + ))) + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + if !self.buf.is_empty() { + return Poll::Ready(Some(Ok(std::mem::take(&mut self.buf)))); + } + + match futures::ready!(self.inner.poll_next_unpin(cx)) { + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.into()))), + Some(Err(err)) => Poll::Ready(Some(Err(err))), + None => Poll::Ready(None), + } + } +} diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs index 64466bbdd..841dfdd1a 100644 --- a/core/src/raw/oio/read/mod.rs +++ b/core/src/raw/oio/read/mod.rs @@ -34,3 +34,13 @@ pub use into_seekable_read_by_range::ByRangeSeekableReader; mod into_read_from_file; pub use into_read_from_file::into_read_from_file; pub use into_read_from_file::FromFileReader; + +mod into_read_from_stream; +pub use into_read_from_stream::into_read_from_stream; +pub use into_read_from_stream::FromStreamReader; + +mod cloneable_read; +pub use cloneable_read::into_cloneable_reader_within_std; +pub use cloneable_read::into_cloneable_reader_within_tokio; +pub use cloneable_read::CloneableReaderWithinStd; +pub use cloneable_read::CloneableReaderWithinTokio; diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs index 132fb78fc..9edfceb6a 100644 --- a/core/src/raw/oio/stream/api.rs +++ b/core/src/raw/oio/stream/api.rs @@ -69,6 +69,18 @@ impl<T: Stream + ?Sized> Stream for Box<T> { } } +impl Stream for dyn raw::oio::Read { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + raw::oio::Read::poll_next(self, cx) + } + + fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + let _ = raw::oio::Read::poll_seek(self, cx, std::io::SeekFrom::Start(0))?; + + Poll::Ready(Ok(())) + } +} + impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { match self.try_lock() { diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index 2bdcd426b..ba5a2b675 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -96,7 +96,7 @@ pub trait Write: Unpin + Send + Sync { /// /// It's possible that `n < size`, caller should pass the remaining bytes /// repeatedly until all bytes has been written. - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64>; + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64>; /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; @@ -113,7 +113,7 @@ impl Write for () { unimplemented!("write is required to be implemented for oio::Write") } - async fn pipe(&mut self, _: u64, _: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _: u64, _: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "output writer doesn't support sink", @@ -144,7 +144,7 @@ impl<T: Write + ?Sized> Write for Box<T> { (**self).write(bs).await } - async fn pipe(&mut self, n: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, n: u64, s: oio::Reader) -> Result<u64> { (**self).pipe(n, s).await } diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index 473d4feac..0e5fd9ed3 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -18,7 +18,6 @@ use async_trait::async_trait; use bytes::Bytes; -use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -92,11 +91,11 @@ where Ok(size) } - async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { let offset = self.offset().await?; self.inner - .append(offset, size, AsyncBody::Stream(s)) + .append(offset, size, AsyncBody::Stream(Box::new(s))) .await .map(|_| self.offset = Some(offset + size))?; diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 62159ea28..99c20ee46 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -18,8 +18,6 @@ use async_trait::async_trait; use bytes::Bytes; -use crate::raw::oio::StreamExt; -use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -92,34 +90,8 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { }) } - async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { - // If total size is known and equals to given stream, we can write it directly. - if let Some(total_size) = self.total_size { - if total_size == size { - return self.inner.pipe(size, s).await; - } - } - - // Push the bytes into the buffer if the buffer is not full. - if self.buffer.len() as u64 + size < self.buffer_size as u64 { - let bs = s.collect().await?; - let size = bs.len() as u64; - self.buffer.push(bs); - return Ok(size); - } - - let buf = self.buffer.clone(); - let buffer_size = buf.len() as u64; - let stream = buf.chain(s); - - self.inner - .pipe(buffer_size + size, Box::new(stream)) - .await - // Clear buffer if the write is successful. - .map(|v| { - self.buffer.clear(); - v - }) + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { + todo!() } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index 287db9bad..05dfdd775 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -41,7 +41,6 @@ use async_trait::async_trait; use bytes::Bytes; -use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -64,7 +63,7 @@ impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { } } - async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { match self { Self::One(one) => one.pipe(size, s).await, Self::Two(two) => two.pipe(size, s).await, @@ -110,7 +109,7 @@ impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write } } - async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { match self { Self::One(one) => one.pipe(size, s).await, Self::Two(two) => two.pipe(size, s).await, diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs index 7c597e7d1..e77330e0b 100644 --- a/core/src/raw/oio/write/exact_buf_write.rs +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -92,50 +92,8 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { /// # TODO /// /// We know every stream size, we can collect them into a buffer without chain them every time. - async fn pipe(&mut self, _: u64, mut s: Streamer) -> Result<u64> { - if self.buffer.len() >= self.buffer_size { - let mut buf = self.buffer.clone(); - let to_write = buf.split_to(self.buffer_size); - return self - .inner - .pipe(to_write.len() as u64, Box::new(to_write)) - .await - // Replace buffer with remaining if the write is successful. - .map(|v| { - self.buffer = buf; - self.chain_stream(s); - v - }); - } - - let mut buf = self.buffer.clone(); - while buf.len() < self.buffer_size { - let bs = self.next_bytes(&mut s).await.transpose()?; - match bs { - None => break, - Some(bs) => buf.push(bs), - } - } - - // Return directly if the buffer is not full. - // - // We don't need to chain stream here because it must be consumed. - if buf.len() < self.buffer_size { - let size = buf.len() as u64; - self.buffer = buf; - return Ok(size); - } - - let to_write = buf.split_to(self.buffer_size); - self.inner - .pipe(to_write.len() as u64, Box::new(to_write)) - .await - // Replace buffer with remaining if the write is successful. - .map(|v| { - self.buffer = buf; - self.chain_stream(s); - v - }) + async fn pipe(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> { + todo!() } async fn abort(&mut self) -> Result<()> { @@ -187,6 +145,7 @@ impl<W: oio::Write> oio::Write for ExactBufWriter<W> { #[cfg(test)] mod tests { + use futures::AsyncReadExt; use log::debug; use pretty_assertions::assert_eq; use rand::thread_rng; @@ -196,7 +155,6 @@ mod tests { use sha2::Sha256; use super::*; - use crate::raw::oio::StreamExt; use crate::raw::oio::Write; struct MockWriter { @@ -212,10 +170,11 @@ mod tests { Ok(bs.len() as u64) } - async fn pipe(&mut self, size: u64, s: Streamer) -> Result<u64> { - let bs = s.collect().await?; + async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> { + let mut bs = vec![]; + s.read_to_end(&mut bs).await?; assert_eq!(bs.len() as u64, size); - self.write(bs).await + self.write(bs.into()).await } async fn abort(&mut self) -> Result<()> { diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 232550607..0b314c163 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -138,11 +138,16 @@ where Ok(size as u64) } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { let upload_id = self.upload_id().await?; self.inner - .write_part(&upload_id, self.parts.len(), size, AsyncBody::Stream(s)) + .write_part( + &upload_id, + self.parts.len(), + size, + AsyncBody::Stream(Box::new(s)), + ) .await .map(|v| self.parts.push(v))?; diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 11dbb501e..a85feecf3 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -58,8 +58,8 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { Ok(size) } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { - self.inner.write_once(size, s).await?; + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { + self.inner.write_once(size, Box::new(s)).await?; Ok(size) } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index dfd4e378a..11776755f 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -180,9 +180,10 @@ impl oio::Write for AzblobWriter { Ok(size) } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { if self.op.append() { - self.append_oneshot(size, AsyncBody::Stream(s)).await?; + self.append_oneshot(size, AsyncBody::Stream(Box::new(s))) + .await?; } else { if self.op.content_length().is_none() { return Err(Error::new( @@ -191,7 +192,8 @@ impl oio::Write for AzblobWriter { )); } - self.write_oneshot(size, AsyncBody::Stream(s)).await?; + self.write_oneshot(size, AsyncBody::Stream(Box::new(s))) + .await?; } Ok(size) diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index a03cec2a1..4b9b90f9f 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -88,7 +88,7 @@ impl oio::Write for AzdfsWriter { } } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 1405fe224..37abab6a9 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -62,7 +62,7 @@ impl oio::Write for DropboxWriter { } } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 80637f068..ab27f2b65 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -67,7 +67,7 @@ impl oio::Write for FsWriter<tokio::fs::File> { Ok(size) } - async fn pipe(&mut self, size: u64, mut s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> { while let Some(bs) = s.next().await { let bs = bs?; self.f diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 3ce495795..3bb11582d 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -55,7 +55,7 @@ impl oio::Write for FtpWriter { Ok(size as u64) } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 305624ed8..e85605fa4 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -167,8 +167,9 @@ impl oio::Write for GcsWriter { } } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { - self.write_oneshot(size, AsyncBody::Stream(s)).await?; + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { + self.write_oneshot(size, AsyncBody::Stream(Box::new(s))) + .await?; Ok(size) } diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index cfe027b6c..df974d940 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -106,7 +106,7 @@ impl oio::Write for GdriveWriter { Ok(size) } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new(ErrorKind::Unsupported, "sink is not supported")) } diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index 59084c70e..bf9116ec2 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -62,7 +62,7 @@ impl oio::Write for GhacWriter { } } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 4b05c08de..3b60a4df3 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -58,7 +58,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { Ok(size as u64) } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 108f67e39..1434e980a 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -53,7 +53,7 @@ impl oio::Write for IpmfsWriter { } } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index edf55c127..75a5e023a 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -58,7 +58,7 @@ impl oio::Write for OneDriveWriter { Ok(size as u64) } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 9f9e1f3dc..5ee5d84ac 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -43,7 +43,7 @@ impl oio::Write for SftpWriter { Ok(size) } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index ee3ee8251..36a572bfe 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -77,7 +77,7 @@ impl oio::Write for SupabaseWriter { Ok(size as u64) } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index b2cf603de..936cddf01 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -62,7 +62,7 @@ impl oio::Write for VercelArtifactsWriter { } } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index c0ddf30b1..a39c835dc 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -65,7 +65,7 @@ impl oio::Write for WasabiWriter { } } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index 1b6e7cfa5..bbe79eb31 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -70,8 +70,9 @@ impl oio::Write for WebdavWriter { Ok(size) } - async fn pipe(&mut self, size: u64, s: oio::Streamer) -> Result<u64> { - self.write_oneshot(size, AsyncBody::Stream(s)).await?; + async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> { + self.write_oneshot(size, AsyncBody::Stream(Box::new(s))) + .await?; Ok(size) } diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index dbb4b409d..ae3396dc3 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -64,7 +64,7 @@ impl oio::Write for WebhdfsWriter { } } - async fn pipe(&mut self, _size: u64, _s: oio::Streamer) -> Result<u64> { + async fn pipe(&mut self, _size: u64, _s: oio::Reader) -> Result<u64> { Err(Error::new( ErrorKind::Unsupported, "Write::sink is not supported", diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index eef54f164..212aba528 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -138,8 +138,8 @@ impl Writer { T: Into<Bytes>, { if let State::Idle(Some(w)) = &mut self.state { - let s = Box::new(oio::into_stream(sink_from.map_ok(|v| v.into()))); - w.pipe(size, s).await + let r = Box::new(oio::into_read_from_stream(sink_from.map_ok(|v| v.into()))); + w.pipe(size, r).await } else { unreachable!( "writer state invalid while sink, expect Idle, actual {}", @@ -180,11 +180,11 @@ impl Writer { /// ``` pub async fn copy<R>(&mut self, size: u64, read_from: R) -> Result<u64> where - R: futures::AsyncRead + Send + Sync + Unpin + 'static, + R: futures::AsyncRead + futures::AsyncSeek + Send + Sync + Unpin + 'static, { if let State::Idle(Some(w)) = &mut self.state { - let s = Box::new(oio::into_stream_from_reader(read_from)); - w.pipe(size, s).await + let r = Box::new(oio::into_read_from_file(read_from, 0, size)); + w.pipe(size, r).await } else { unreachable!( "writer state invalid while copy, expect Idle, actual {}",
