This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch lazy-reader
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit c88362cf94838c2afc6488215dcd5fb0b252db86
Author: Xuanwo <[email protected]>
AuthorDate: Thu Oct 26 21:18:18 2023 +0800

    Save
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs                   |  41 +++---
 core/src/layers/madsim.rs                     |   5 +-
 core/src/layers/prometheus.rs                 |  38 ++---
 core/src/layers/retry.rs                      |   2 +-
 core/src/raw/adapters/kv/backend.rs           |   5 +-
 core/src/raw/adapters/typed_kv/backend.rs     |   5 +-
 core/src/raw/oio/read/api.rs                  | 130 ++++++++++++++++-
 core/src/raw/oio/read/lazy_read.rs            | 198 ++++++++++++++++++++++++++
 core/src/raw/oio/read/mod.rs                  |   3 +
 core/src/raw/oio/read/range_read.rs           |   2 +-
 core/src/raw/rps.rs                           |  25 +---
 core/src/services/azblob/backend.rs           |   6 +-
 core/src/services/azdls/backend.rs            |   5 +-
 core/src/services/azfile/backend.rs           |   5 +-
 core/src/services/cos/backend.rs              |   5 +-
 core/src/services/dropbox/backend.rs          |   5 +-
 core/src/services/fs/backend.rs               |   4 +-
 core/src/services/ftp/backend.rs              |  14 +-
 core/src/services/gcs/backend.rs              |   3 +-
 core/src/services/gdrive/backend.rs           |  18 +--
 core/src/services/ghac/backend.rs             |   5 +-
 core/src/services/hdfs/backend.rs             |   4 +-
 core/src/services/http/backend.rs             |   5 +-
 core/src/services/ipfs/backend.rs             |   5 +-
 core/src/services/ipmfs/backend.rs            |   5 +-
 core/src/services/obs/backend.rs              |   5 +-
 core/src/services/onedrive/backend.rs         |   5 +-
 core/src/services/oss/backend.rs              |   5 +-
 core/src/services/s3/backend.rs               |   5 +-
 core/src/services/sftp/backend.rs             |   2 +-
 core/src/services/supabase/backend.rs         |   5 +-
 core/src/services/vercel_artifacts/backend.rs |   5 +-
 core/src/services/wasabi/backend.rs           |   5 +-
 core/src/services/webdav/backend.rs           |   5 +-
 core/src/services/webhdfs/backend.rs          |   5 +-
 core/src/types/operator/blocking_operator.rs  |  23 +--
 core/src/types/operator/operator.rs           |  33 +----
 37 files changed, 417 insertions(+), 229 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 903b64e6b..0a0a0bd3f 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -27,13 +27,13 @@ use std::task::Poll;
 use async_trait::async_trait;
 use bytes::Bytes;
 
-use crate::raw::oio::into_flat_page;
-use crate::raw::oio::into_hierarchy_page;
 use crate::raw::oio::Entry;
 use crate::raw::oio::FlatPager;
 use crate::raw::oio::HierarchyPager;
 use crate::raw::oio::RangeReader;
 use crate::raw::oio::StreamableReader;
+use crate::raw::oio::{into_flat_page, FileReader};
+use crate::raw::oio::{into_hierarchy_page, LazyReader};
 use crate::raw::*;
 use crate::*;
 
@@ -162,22 +162,24 @@ impl<A: Accessor> CompleteAccessor<A> {
         let seekable = capability.read_can_seek;
         let streamable = capability.read_can_next;
 
-        let (rp, r) = self.inner.read(path, args.clone()).await?;
-
         match (seekable, streamable) {
-            (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))),
+            (true, true) => {
+                let r = LazyReader::new(self.inner.clone(), path, args);
+                Ok((RpRead::new(), CompleteReader::AlreadyComplete(r)))
+            }
             (true, false) => {
-                let r = oio::into_streamable_read(r, 256 * 1024);
-                Ok((rp, CompleteReader::NeedStreamable(r)))
+                let r = FileReader::new(self.inner.clone(), path, args);
+
+                Ok((RpRead::new(), CompleteReader::NeedStreamable(r)))
             }
             _ => {
                 let r = RangeReader::new(self.inner.clone(), path, args);
 
                 if streamable {
-                    Ok((rp, CompleteReader::NeedSeekable(r)))
+                    Ok((RpRead::new(), CompleteReader::NeedSeekable(r)))
                 } else {
                     let r = oio::into_streamable_read(r, 256 * 1024);
-                    Ok((rp, CompleteReader::NeedBoth(r)))
+                    Ok((RpRead::new(), CompleteReader::NeedBoth(r)))
                 }
             }
         }
@@ -196,22 +198,23 @@ impl<A: Accessor> CompleteAccessor<A> {
         let seekable = capability.read_can_seek;
         let streamable = capability.read_can_next;
 
-        let (rp, r) = self.inner.blocking_read(path, args.clone())?;
-
         match (seekable, streamable) {
-            (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))),
+            (true, true) => {
+                let r = LazyReader::new(self.inner.clone(), path, args);
+                Ok((RpRead::new(), CompleteReader::AlreadyComplete(r)))
+            }
             (true, false) => {
-                let r = oio::into_streamable_read(r, 256 * 1024);
-                Ok((rp, CompleteReader::NeedStreamable(r)))
+                let r = FileReader::new(self.inner.clone(), path, args);
+                Ok((RpRead::new(), CompleteReader::NeedStreamable(r)))
             }
             _ => {
                 let r = RangeReader::new(self.inner.clone(), path, args);
 
                 if streamable {
-                    Ok((rp, CompleteReader::NeedSeekable(r)))
+                    Ok((RpRead::new(), CompleteReader::NeedSeekable(r)))
                 } else {
                     let r = oio::into_streamable_read(r, 256 * 1024);
-                    Ok((rp, CompleteReader::NeedBoth(r)))
+                    Ok((RpRead::new(), CompleteReader::NeedBoth(r)))
                 }
             }
         }
@@ -546,9 +549,9 @@ impl<A: Accessor> LayeredAccessor for CompleteAccessor<A> {
 }
 
 pub enum CompleteReader<A: Accessor, R> {
-    AlreadyComplete(R),
+    AlreadyComplete(LazyReader<A, R>),
     NeedSeekable(RangeReader<A, R>),
-    NeedStreamable(StreamableReader<R>),
+    NeedStreamable(FileReader<A, R>),
     NeedBoth(StreamableReader<RangeReader<A, R>>),
 }
 
@@ -788,7 +791,7 @@ mod tests {
         }
 
         async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-            Ok((RpRead::new(0), Box::new(())))
+            Ok((RpRead::new(), Box::new(())))
         }
 
         async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 6e2a3b6a3..1d1253baf 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -191,10 +191,7 @@ impl LayeredAccessor for MadsimAccessor {
                 .downcast::<ReadResponse>()
                 .expect("fail to downcast response to ReadResponse");
             let content_length = resp.data.as_ref().map(|b| 
b.len()).unwrap_or(0);
-            Ok((
-                RpRead::new(content_length as u64),
-                MadsimReader { data: resp.data },
-            ))
+            Ok((RpRead::new(), MadsimReader { data: resp.data }))
         }
         #[cfg(not(madsim))]
         {
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 4692da0a6..f26a4af2b 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -320,28 +320,18 @@ impl<A: Accessor> LayeredAccessor for 
PrometheusAccessor<A> {
             .with_label_values(&labels)
             .start_timer();
 
-        let read_res = self
-            .inner
-            .read(path, args)
-            .map(|v| {
-                v.map(|(rp, r)| {
-                    self.stats
-                        .bytes_total
-                        .with_label_values(&labels)
-                        .observe(rp.metadata().content_length() as f64);
-                    (
-                        rp,
-                        PrometheusMetricWrapper::new(
-                            r,
-                            Operation::Read,
-                            self.stats.clone(),
-                            self.scheme,
-                            &path.to_string(),
-                        ),
-                    )
-                })
-            })
-            .await;
+        let read_res = self.inner.read(path, args).await.map(|(rp, r)| {
+            (
+                rp,
+                PrometheusMetricWrapper::new(
+                    r,
+                    Operation::Read,
+                    self.stats.clone(),
+                    self.scheme,
+                    &path.to_string(),
+                ),
+            )
+        });
         timer.observe_duration();
         read_res.map_err(|e| {
             self.stats.increment_errors_total(Operation::Read, e.kind());
@@ -546,10 +536,6 @@ impl<A: Accessor> LayeredAccessor for 
PrometheusAccessor<A> {
             .with_label_values(&labels)
             .start_timer();
         let result = self.inner.blocking_read(path, args).map(|(rp, r)| {
-            self.stats
-                .bytes_total
-                .with_label_values(&labels)
-                .observe(rp.metadata().content_length() as f64);
             (
                 rp,
                 PrometheusMetricWrapper::new(
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 67239d3fd..9ae6c877d 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -1164,7 +1164,7 @@ mod tests {
 
         async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, 
Self::Reader)> {
             Ok((
-                RpRead::new(13),
+                RpRead::new(),
                 MockReader {
                     attempt: self.attempt.clone(),
                     pos: 0,
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 94db4de8d..799dd1be5 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -129,8 +129,7 @@ impl<S: Adapter> Accessor for Backend<S> {
 
         let bs = self.apply_range(bs, args.range());
 
-        let length = bs.len();
-        Ok((RpRead::new(length as u64), oio::Cursor::from(bs)))
+        Ok((RpRead::new(), oio::Cursor::from(bs)))
     }
 
     fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
@@ -142,7 +141,7 @@ impl<S: Adapter> Accessor for Backend<S> {
         };
 
         let bs = self.apply_range(bs, args.range());
-        Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs)))
+        Ok((RpRead::new(), oio::Cursor::from(bs)))
     }
 
     async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index d5313b8e0..ca872346a 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -135,8 +135,7 @@ impl<S: Adapter> Accessor for Backend<S> {
 
         let bs = self.apply_range(bs, args.range());
 
-        let length = bs.len();
-        Ok((RpRead::new(length as u64), oio::Cursor::from(bs)))
+        Ok((RpRead::new(), oio::Cursor::from(bs)))
     }
 
     fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
@@ -149,7 +148,7 @@ impl<S: Adapter> Accessor for Backend<S> {
         };
 
         let bs = self.apply_range(bs, args.range());
-        Ok((RpRead::new(bs.len() as u64), oio::Cursor::from(bs)))
+        Ok((RpRead::new(), oio::Cursor::from(bs)))
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs
index 79fd77c33..436c8447f 100644
--- a/core/src/raw/oio/read/api.rs
+++ b/core/src/raw/oio/read/api.rs
@@ -17,10 +17,10 @@
 
 use std::fmt::Display;
 use std::fmt::Formatter;
-use std::io;
 use std::pin::Pin;
-use std::task::Context;
 use std::task::Poll;
+use std::task::{ready, Context};
+use std::{cmp, io};
 
 use bytes::Bytes;
 use futures::Future;
@@ -198,6 +198,16 @@ pub trait ReadExt: Read {
     fn next(&mut self) -> NextFuture<'_, Self> {
         NextFuture { reader: self }
     }
+
+    /// Build a future for `read_to_end`.
+    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> 
ReadToEndFuture<'a, Self> {
+        let start = buf.len();
+        ReadToEndFuture {
+            reader: self,
+            buf,
+            start,
+        }
+    }
 }
 
 /// Make this future `!Unpin` for compatibility with async trait methods.
@@ -256,6 +266,82 @@ where
     }
 }
 
+/// Make this future `!Unpin` for compatibility with async trait methods.
+#[pin_project(!Unpin)]
+pub struct ReadToEndFuture<'a, R: Read + Unpin + ?Sized> {
+    reader: &'a mut R,
+    buf: &'a mut Vec<u8>,
+    start: usize,
+}
+
+impl<R> Future for ReadToEndFuture<'_, R>
+where
+    R: Read + Unpin + ?Sized,
+{
+    type Output = Result<usize>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> 
{
+        let this = self.project();
+
+        let mut g = ReadToEndGuard {
+            len: this.buf.len(),
+            buf: this.buf,
+            next: MIN_READ_TO_END_GROW_SIZE,
+        };
+
+        loop {
+            if g.buf.capacity() - g.buf.len() < g.next {
+                g.buf.reserve(g.next);
+                unsafe {
+                    g.buf.set_len(g.buf.capacity());
+                }
+            }
+
+            let buf = &mut g.buf[g.len..];
+            match ready!(this.reader.poll_read(cx, buf)) {
+                Ok(0) => return Poll::Ready(Ok(g.len - *this.start)),
+                Ok(n) => {
+                    g.next = if n >= g.next {
+                        cmp::min(g.next.saturating_mul(2), 
MAX_READ_TO_END_GROW_SIZE)
+                    } else if n >= g.next / 2 {
+                        g.next
+                    } else {
+                        cmp::max(g.next.saturating_div(2), 
MIN_READ_TO_END_GROW_SIZE)
+                    };
+                    // We can't allow bogus values from read. If it is too 
large, the returned vec could have its length
+                    // set past its capacity, or if it overflows the vec could 
be shortened which could create an invalid
+                    // string if this is called via read_to_string.
+                    assert!(n <= buf.len());
+                    g.len += n;
+                }
+                Err(e) => return Poll::Ready(Err(e)),
+            }
+        }
+    }
+}
+
+const MIN_READ_TO_END_GROW_SIZE: usize = 8 * 1024;
+const MAX_READ_TO_END_GROW_SIZE: usize = 4 * 1024 * 1024;
+
+/// ReadToEndGuard makes sure that the buf length is maintained correctly.
+struct ReadToEndGuard<'a> {
+    buf: &'a mut Vec<u8>,
+    /// Store the real length of buf.
+    len: usize,
+    next: usize,
+}
+
+impl Drop for ReadToEndGuard<'_> {
+    /// # Safety
+    ///
+    /// We make sure that the length of buf is maintained correctly.
+    fn drop(&mut self) {
+        unsafe {
+            self.buf.set_len(self.len);
+        }
+    }
+}
+
 /// BlockingReader is a boxed dyn `BlockingRead`.
 pub type BlockingReader = Box<dyn BlockingRead>;
 
@@ -278,6 +364,46 @@ pub trait BlockingRead: Send + Sync {
 
     /// Iterating [`Bytes`] from underlying reader.
     fn next(&mut self) -> Option<Result<Bytes>>;
+
+    /// Read all data of current reader to the end of buf.
+    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
+        let start_len = buf.len();
+        let mut g = ReadToEndGuard {
+            len: buf.len(),
+            buf,
+            next: MIN_READ_TO_END_GROW_SIZE,
+        };
+
+        loop {
+            if g.buf.capacity() - g.buf.len() < g.next {
+                g.buf.reserve(g.next);
+                unsafe {
+                    g.buf.set_len(g.buf.capacity());
+                }
+            }
+
+            let buf = &mut g.buf[g.len..];
+            match self.read(buf) {
+                Ok(0) => return Ok(g.len - start_len),
+                Ok(n) => {
+                    g.next = if n >= g.next {
+                        cmp::min(g.next.saturating_mul(2), 
MAX_READ_TO_END_GROW_SIZE)
+                    } else if n >= g.next / 2 {
+                        g.next
+                    } else {
+                        cmp::max(g.next.saturating_div(2), 
MIN_READ_TO_END_GROW_SIZE)
+                    };
+
+                    // We can't allow bogus values from read. If it is too 
large, the returned vec could have its length
+                    // set past its capacity, or if it overflows the vec could 
be shortened which could create an invalid
+                    // string if this is called via read_to_string.
+                    assert!(n <= buf.len());
+                    g.len += n;
+                }
+                Err(e) => return Err(e),
+            }
+        }
+    }
 }
 
 impl BlockingRead for () {
diff --git a/core/src/raw/oio/read/lazy_read.rs 
b/core/src/raw/oio/read/lazy_read.rs
new file mode 100644
index 000000000..89705deff
--- /dev/null
+++ b/core/src/raw/oio/read/lazy_read.rs
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::Future;
+use std::io::SeekFrom;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{ready, Context, Poll};
+
+/// LazyReader implements [`oio::Read`] in a lazy way.
+///
+/// The real requests are send when users calling read or seek.
+pub struct LazyReader<A: Accessor, R> {
+    acc: Arc<A>,
+    path: Arc<String>,
+    op: OpRead,
+    state: State<R>,
+}
+
+enum State<R> {
+    Idle,
+    Send(BoxFuture<'static, Result<(RpRead, R)>>),
+    Read(R),
+}
+
+/// Safety: State will only be accessed under &mut.
+unsafe impl<R> Sync for State<R> {}
+
+impl<A, R> LazyReader<A, R>
+where
+    A: Accessor,
+{
+    /// Create a new [`oio::Reader`] with lazy support.
+    pub fn new(acc: Arc<A>, path: &str, op: OpRead) -> LazyReader<A, R> {
+        LazyReader {
+            acc,
+            path: Arc::new(path.to_string()),
+            op,
+
+            state: State::<R>::Idle,
+        }
+    }
+}
+
+impl<A, R> LazyReader<A, R>
+where
+    A: Accessor<Reader = R>,
+    R: oio::Read,
+{
+    fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> {
+        let acc = self.acc.clone();
+        let path = self.path.clone();
+        let op = self.op.clone();
+
+        Box::pin(async move { acc.read(&path, op).await })
+    }
+}
+
+impl<A, R> oio::Read for LazyReader<A, R>
+where
+    A: Accessor<Reader = R>,
+    R: oio::Read,
+{
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        match &mut self.state {
+            State::Idle => {
+                self.state = State::Send(self.read_future());
+                self.poll_read(cx, buf)
+            }
+            State::Send(fut) => {
+                let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
+                    // If read future returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
+                    err
+                })?;
+                self.state = State::Read(r);
+                self.poll_read(cx, buf)
+            }
+            State::Read(r) => r.poll_read(cx, buf),
+        }
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        match &mut self.state {
+            State::Idle => {
+                self.state = State::Send(self.read_future());
+                self.poll_seek(cx, pos)
+            }
+            State::Send(fut) => {
+                let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
+                    // If read future returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
+                    err
+                })?;
+                self.state = State::Read(r);
+                self.poll_seek(cx, pos)
+            }
+            State::Read(r) => r.poll_seek(cx, pos),
+        }
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        match &mut self.state {
+            State::Idle => {
+                self.state = State::Send(self.read_future());
+                self.poll_next(cx)
+            }
+            State::Send(fut) => {
+                let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
+                    // If read future returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
+                    err
+                })?;
+                self.state = State::Read(r);
+                self.poll_next(cx)
+            }
+            State::Read(r) => r.poll_next(cx),
+        }
+    }
+}
+
+impl<A, R> oio::BlockingRead for LazyReader<A, R>
+where
+    A: Accessor<BlockingReader = R>,
+    R: oio::BlockingRead,
+{
+    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        match &mut self.state {
+            State::Idle => {
+                let (_, r) = self.acc.blocking_read(&self.path, 
self.op.clone())?;
+                self.state = State::Read(r);
+                self.read(buf)
+            }
+            State::Read(r) => r.read(buf),
+            State::Send(_) => {
+                unreachable!(
+                    "It's invalid to go into State::Send for BlockingRead, 
please report this bug"
+                )
+            }
+        }
+    }
+
+    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
+        match &mut self.state {
+            State::Idle => {
+                let (_, r) = self.acc.blocking_read(&self.path, 
self.op.clone())?;
+                self.state = State::Read(r);
+                self.seek(pos)
+            }
+            State::Read(r) => r.seek(pos),
+            State::Send(_) => {
+                unreachable!(
+                    "It's invalid to go into State::Send for BlockingRead, 
please report this bug"
+                )
+            }
+        }
+    }
+
+    fn next(&mut self) -> Option<Result<Bytes>> {
+        match &mut self.state {
+            State::Idle => {
+                let r = match self.acc.blocking_read(&self.path, 
self.op.clone()) {
+                    Ok((_, r)) => r,
+                    Err(err) => return Some(Err(err)),
+                };
+                self.state = State::Read(r);
+                self.next()
+            }
+            State::Read(r) => r.next(),
+            State::Send(_) => {
+                unreachable!(
+                    "It's invalid to go into State::Send for BlockingRead, 
please report this bug"
+                )
+            }
+        }
+    }
+}
diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs
index 16510636a..5f7d5d93a 100644
--- a/core/src/raw/oio/read/mod.rs
+++ b/core/src/raw/oio/read/mod.rs
@@ -45,3 +45,6 @@ pub use tokio_read::TokioReader;
 
 mod std_read;
 pub use std_read::StdReader;
+
+mod lazy_read;
+pub use lazy_read::LazyReader;
diff --git a/core/src/raw/oio/read/range_read.rs 
b/core/src/raw/oio/read/range_read.rs
index 870285e42..e6967cea5 100644
--- a/core/src/raw/oio/read/range_read.rs
+++ b/core/src/raw/oio/read/range_read.rs
@@ -637,7 +637,7 @@ mod tests {
             let bs = args.range().apply_on_bytes(self.data.clone());
 
             Ok((
-                RpRead::new(bs.len() as u64),
+                RpRead::new(),
                 MockReader {
                     inner: futures::io::Cursor::new(bs.into()),
                 },
diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs
index bc45d1457..17e4470ae 100644
--- a/core/src/raw/rps.rs
+++ b/core/src/raw/rps.rs
@@ -98,31 +98,12 @@ impl<T: Default> From<PresignedRequest> for Request<T> {
 
 /// Reply for `read` operation.
 #[derive(Debug, Clone)]
-pub struct RpRead {
-    meta: Metadata,
-}
+pub struct RpRead {}
 
 impl RpRead {
     /// Create a new reply for `read`.
-    pub fn new(content_length: u64) -> Self {
-        RpRead {
-            meta: 
Metadata::new(EntryMode::FILE).with_content_length(content_length),
-        }
-    }
-
-    /// Create reply read with existing metadata.
-    pub fn with_metadata(meta: Metadata) -> Self {
-        RpRead { meta }
-    }
-
-    /// Get a ref of metadata.
-    pub fn metadata(&self) -> &Metadata {
-        &self.meta
-    }
-
-    /// Consume reply to get the meta.
-    pub fn into_metadata(self) -> Metadata {
-        self.meta
+    pub fn new() -> Self {
+        RpRead {}
     }
 }
 
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index de6d5e3e9..375a375d8 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -586,11 +586,7 @@ impl Accessor for AzblobBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/azdls/backend.rs 
b/core/src/services/azdls/backend.rs
index 9351a2522..1ab176255 100644
--- a/core/src/services/azdls/backend.rs
+++ b/core/src/services/azdls/backend.rs
@@ -292,10 +292,7 @@ impl Accessor for AzdlsBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/azfile/backend.rs 
b/core/src/services/azfile/backend.rs
index c7d8ad41c..78af5035d 100644
--- a/core/src/services/azfile/backend.rs
+++ b/core/src/services/azfile/backend.rs
@@ -310,10 +310,7 @@ impl Accessor for AzfileBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index d376abcb7..5ffe4be23 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -332,10 +332,7 @@ impl Accessor for CosBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/dropbox/backend.rs 
b/core/src/services/dropbox/backend.rs
index 8400b3300..bbe3b0b18 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -97,10 +97,7 @@ impl Accessor for DropboxBackend {
         let resp = self.core.dropbox_get(path, args).await?;
         let status = resp.status();
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 034d75e03..4b0d14207 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -327,7 +327,7 @@ impl Accessor for FsBackend {
         }
 
         let r = oio::TokioReader::new(f);
-        Ok((RpRead::new(0), r))
+        Ok((RpRead::new(), r))
     }
 
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
@@ -500,7 +500,7 @@ impl Accessor for FsBackend {
 
         let r = oio::StdReader::new(f);
 
-        Ok((RpRead::new(0), r))
+        Ok((RpRead::new(), r))
     }
 
     fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index 2fe8ffe9a..0fe5a1778 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp::min;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
@@ -319,37 +318,38 @@ impl Accessor for FtpBackend {
         return Ok(RpCreateDir::default());
     }
 
+    /// TODO: migrate to FileReader maybe?
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
         let mut ftp_stream = self.ftp_connect(Operation::Read).await?;
 
         let meta = self.ftp_stat(path).await?;
 
         let br = args.range();
-        let (r, size): (Box<dyn AsyncRead + Send + Unpin>, _) = match 
(br.offset(), br.size()) {
+        let r: Box<dyn AsyncRead + Send + Unpin> = match (br.offset(), 
br.size()) {
             (Some(offset), Some(size)) => {
                 ftp_stream.resume_transfer(offset as usize).await?;
                 let ds = ftp_stream.retr_as_stream(path).await?.take(size);
-                (Box::new(ds), min(size, meta.size() as u64 - offset))
+                Box::new(ds)
             }
             (Some(offset), None) => {
                 ftp_stream.resume_transfer(offset as usize).await?;
                 let ds = ftp_stream.retr_as_stream(path).await?;
-                (Box::new(ds), meta.size() as u64 - offset)
+                Box::new(ds)
             }
             (None, Some(size)) => {
                 ftp_stream
                     .resume_transfer((meta.size() as u64 - size) as usize)
                     .await?;
                 let ds = ftp_stream.retr_as_stream(path).await?;
-                (Box::new(ds), size)
+                Box::new(ds)
             }
             (None, None) => {
                 let ds = ftp_stream.retr_as_stream(path).await?;
-                (Box::new(ds), meta.size() as u64)
+                Box::new(ds)
             }
         };
 
-        Ok((RpRead::new(size), FtpReader::new(r, ftp_stream)))
+        Ok((RpRead::new(), FtpReader::new(r, ftp_stream)))
     }
 
     async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index e65fb155d..c667920d9 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -389,8 +389,7 @@ impl Accessor for GcsBackend {
         let resp = self.core.gcs_get_object(path, &args).await?;
 
         if resp.status().is_success() {
-            let meta = parse_into_metadata(path, resp.headers())?;
-            Ok((RpRead::with_metadata(meta), resp.into_body()))
+            Ok((RpRead::new(), resp.into_body()))
         } else {
             Err(parse_error(resp).await?)
         }
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index 879eab7cc..53ab999ea 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -118,26 +118,12 @@ impl Accessor for GdriveBackend {
     }
 
     async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        // We need to request for metadata and body separately here.
-        // Request for metadata first to check if the file exists.
-        let resp = self.core.gdrive_stat(path).await?;
+        let resp = self.core.gdrive_get(path).await?;
 
         let status = resp.status();
 
         match status {
-            StatusCode::OK => {
-                let body = resp.into_body().bytes().await?;
-                let meta = self.parse_metadata(body)?;
-
-                let resp = self.core.gdrive_get(path).await?;
-
-                let status = resp.status();
-
-                match status {
-                    StatusCode::OK => Ok((RpRead::with_metadata(meta), 
resp.into_body())),
-                    _ => Err(parse_error(resp).await?),
-                }
-            }
+            StatusCode::OK => Ok((RpRead::new(), resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/ghac/backend.rs 
b/core/src/services/ghac/backend.rs
index 83c6a46d6..941d89ce7 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -350,10 +350,7 @@ impl Accessor for GhacBackend {
 
         let status = resp.status();
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/hdfs/backend.rs 
b/core/src/services/hdfs/backend.rs
index fed635b15..a52d9c284 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -213,7 +213,7 @@ impl Accessor for HdfsBackend {
 
         let r = oio::FuturesReader::new(f);
 
-        Ok((RpRead::new(0), r))
+        Ok((RpRead::new(), r))
     }
 
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
@@ -335,7 +335,7 @@ impl Accessor for HdfsBackend {
 
         let r = oio::StdReader::new(f);
 
-        Ok((RpRead::new(0), r))
+        Ok((RpRead::new(), r))
     }
 
     fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
diff --git a/core/src/services/http/backend.rs 
b/core/src/services/http/backend.rs
index af50631cc..2877d5aa2 100644
--- a/core/src/services/http/backend.rs
+++ b/core/src/services/http/backend.rs
@@ -237,10 +237,7 @@ impl Accessor for HttpBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/ipfs/backend.rs 
b/core/src/services/ipfs/backend.rs
index ebf1e327e..6a557d6a4 100644
--- a/core/src/services/ipfs/backend.rs
+++ b/core/src/services/ipfs/backend.rs
@@ -194,10 +194,7 @@ impl Accessor for IpfsBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index e7999767b..303b1db8d 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -112,10 +112,7 @@ impl Accessor for IpmfsBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK => Ok((RpRead::new(), resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index aa5a86864..18bcefb52 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -359,10 +359,7 @@ impl Accessor for ObsBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/onedrive/backend.rs 
b/core/src/services/onedrive/backend.rs
index 0a7952e4a..8b1959b9b 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -93,10 +93,7 @@ impl Accessor for OnedriveBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
 
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 8ce0d65d8..043993699 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -472,10 +472,7 @@ impl Accessor for OssBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index f9e160502..e6b862982 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -969,10 +969,7 @@ impl Accessor for S3Backend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/sftp/backend.rs 
b/core/src/services/sftp/backend.rs
index 5c67dab76..792e27f49 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -300,7 +300,7 @@ impl Accessor for SftpBackend {
         // - `oio::TokioReader::new(x)` makes it a `oio::TokioReader` which 
implements `oio::Read`.
         let r = oio::TokioReader::new(Box::pin(TokioCompatFile::new(f)));
 
-        Ok((RpRead::new(0), r))
+        Ok((RpRead::new(), r))
     }
 
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
diff --git a/core/src/services/supabase/backend.rs 
b/core/src/services/supabase/backend.rs
index a5d0d3db1..b96bbd3ef 100644
--- a/core/src/services/supabase/backend.rs
+++ b/core/src/services/supabase/backend.rs
@@ -215,10 +215,7 @@ impl Accessor for SupabaseBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/vercel_artifacts/backend.rs 
b/core/src/services/vercel_artifacts/backend.rs
index 9a0ae95cf..432b82b68 100644
--- a/core/src/services/vercel_artifacts/backend.rs
+++ b/core/src/services/vercel_artifacts/backend.rs
@@ -74,10 +74,7 @@ impl Accessor for VercelArtifactsBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
 
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
index a7a41042f..c2f746e77 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -738,10 +738,7 @@ impl Accessor for WasabiBackend {
         let status = resp.status();
 
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index 4c4cf6b83..24c80a3da 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -267,10 +267,7 @@ impl Accessor for WebdavBackend {
         let resp = self.webdav_get(path, args).await?;
         let status = resp.status();
         match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 4adbc6ab9..3863b2dd3 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -465,10 +465,7 @@ impl Accessor for WebhdfsBackend {
         let range = args.range();
         let resp = self.webhdfs_read_file(path, range).await?;
         match resp.status() {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
-            }
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index 3dfa0d121..f55a1c48a 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -15,11 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::io::Read;
-
 use bytes::Bytes;
 
 use super::operator_functions::*;
+use crate::raw::oio::BlockingRead;
 use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
@@ -339,22 +338,12 @@ impl BlockingOperator {
                     );
                 }
 
-                let (rp, mut s) = inner.blocking_read(&path, args)?;
-                let mut buffer = 
Vec::with_capacity(rp.into_metadata().content_length() as usize);
+                let (_, mut s) = inner.blocking_read(&path, args)?;
 
-                match s.read_to_end(&mut buffer) {
-                    Ok(n) => {
-                        buffer.truncate(n);
-                        Ok(buffer)
-                    }
-                    Err(err) => Err(
-                        Error::new(ErrorKind::Unexpected, "blocking read_with 
failed")
-                            .with_operation("BlockingOperator::read_with")
-                            .with_context("service", 
inner.info().scheme().into_static())
-                            .with_context("path", &path)
-                            .set_source(err),
-                    ),
-                }
+                let mut buf = Vec::new();
+                s.read_to_end(&mut buf)?;
+
+                Ok(buf)
             },
         ))
     }
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index 00bb13e10..1abb1f027 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -20,15 +20,13 @@ use std::time::Duration;
 use bytes::Buf;
 use bytes::Bytes;
 use futures::stream;
-use futures::AsyncReadExt;
 use futures::Stream;
 use futures::StreamExt;
 use futures::TryStreamExt;
-use tokio::io::ReadBuf;
 
 use super::BlockingOperator;
 use crate::operator_futures::*;
-use crate::raw::oio::WriteExt;
+use crate::raw::oio::{ReadExt, WriteExt};
 use crate::raw::*;
 use crate::*;
 
@@ -368,32 +366,11 @@ impl Operator {
                         .with_context("path", &path));
                     }
 
-                    let br = args.range();
-                    let (rp, mut s) = inner.read(&path, args).await?;
+                    let (_, mut s) = inner.read(&path, args).await?;
+                    let mut buf = Vec::new();
+                    s.read_to_end(&mut buf).await?;
 
-                    let length = rp.into_metadata().content_length() as usize;
-                    let mut buffer = Vec::with_capacity(length);
-
-                    let dst = buffer.spare_capacity_mut();
-                    let mut buf = ReadBuf::uninit(dst);
-
-                    // Safety: the input buffer is created 
with_capacity(length).
-                    unsafe { buf.assume_init(length) };
-
-                    // TODO: use native read api
-                    s.read_exact(buf.initialized_mut()).await.map_err(|err| {
-                        Error::new(ErrorKind::Unexpected, "read from storage")
-                            .with_operation("read")
-                            .with_context("service", 
inner.info().scheme().into_static())
-                            .with_context("path", &path)
-                            .with_context("range", br.to_string())
-                            .set_source(err)
-                    })?;
-
-                    // Safety: read_exact makes sure this buffer has been 
filled.
-                    unsafe { buffer.set_len(length) }
-
-                    Ok(buffer)
+                    Ok(buf)
                 };
 
                 Box::pin(fut)


Reply via email to