This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 9086fcc18f refactor(core/raw): Align oio::BlockingRead API with 
oio::Read (#4349)
9086fcc18f is described below

commit 9086fcc18f49ed86b0140e23f083308a72a903a6
Author: Xuanwo <[email protected]>
AuthorDate: Wed Mar 13 01:09:38 2024 +0800

    refactor(core/raw): Align oio::BlockingRead API with oio::Read (#4349)
    
    * Remove next in read
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Refactor core
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * fix binding c
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix cpp
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Address python
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 bin/ofs/Cargo.lock                            |  20 +--
 bin/oli/Cargo.lock                            | 107 ++++++++++++++-
 bindings/c/src/reader.rs                      |  15 ++-
 bindings/cpp/src/reader.rs                    |   5 +-
 bindings/nodejs/src/lib.rs                    |   6 +-
 bindings/ocaml/src/operator/reader.rs         |   6 +-
 bindings/python/src/file.rs                   |   8 +-
 core/src/layers/blocking.rs                   |  18 +--
 core/src/layers/chaos.rs                      |  12 +-
 core/src/layers/concurrent_limit.rs           |   8 +-
 core/src/layers/dtrace.rs                     |  30 +----
 core/src/layers/error_context.rs              |  16 +--
 core/src/layers/logging.rs                    |  46 +------
 core/src/layers/metrics.rs                    |  23 +---
 core/src/layers/minitrace.rs                  |  10 +-
 core/src/layers/oteltrace.rs                  |   8 +-
 core/src/layers/prometheus.rs                 |  31 +----
 core/src/layers/prometheus_client.rs          |  24 +---
 core/src/layers/retry.rs                      |  23 +---
 core/src/layers/throttle.rs                   |   8 +-
 core/src/layers/tracing.rs                    |  12 +-
 core/src/raw/enum_utils.rs                    |  48 ++-----
 core/src/raw/oio/cursor.rs                    |  40 +++---
 core/src/raw/oio/read/api.rs                  |  72 +---------
 core/src/raw/oio/read/buffer_reader.rs        | 181 +++++++++++---------------
 core/src/raw/oio/read/file_read.rs            | 109 +++-------------
 core/src/raw/oio/read/into_streamable_read.rs |  32 ++---
 core/src/raw/oio/read/lazy_read.rs            |  13 +-
 core/src/raw/oio/read/range_read.rs           | 136 +++++++------------
 core/src/raw/oio/read/std_read.rs             |  42 +++---
 core/src/raw/oio/read/tokio_read.rs           |  16 +--
 core/src/raw/tests/read.rs                    |   9 +-
 core/src/types/operator/blocking_operator.rs  |   6 +-
 core/src/types/reader.rs                      | 109 ++++++++++++++--
 34 files changed, 494 insertions(+), 755 deletions(-)

diff --git a/bin/ofs/Cargo.lock b/bin/ofs/Cargo.lock
index 591f06532f..a3d40c91b6 100644
--- a/bin/ofs/Cargo.lock
+++ b/bin/ofs/Cargo.lock
@@ -114,9 +114,9 @@ checksum = 
"d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
 
 [[package]]
 name = "backon"
-version = "0.4.1"
+version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9"
+checksum = "c491fa80d69c03084223a4e73c378dd9f9a1e612eb54051213f88b2d5249b458"
 dependencies = [
  "fastrand",
  "futures-core",
@@ -364,12 +364,9 @@ dependencies = [
 
 [[package]]
 name = "fastrand"
-version = "1.9.0"
+version = "2.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
-dependencies = [
- "instant",
-]
+checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
 
 [[package]]
 name = "flagset"
@@ -686,15 +683,6 @@ dependencies = [
  "hashbrown",
 ]
 
-[[package]]
-name = "instant"
-version = "0.1.12"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
-dependencies = [
- "cfg-if",
-]
-
 [[package]]
 name = "ipnet"
 version = "2.9.0"
diff --git a/bin/oli/Cargo.lock b/bin/oli/Cargo.lock
index a8f048c0cd..f28a894261 100644
--- a/bin/oli/Cargo.lock
+++ b/bin/oli/Cargo.lock
@@ -17,6 +17,17 @@ version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
 
+[[package]]
+name = "aes"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
+dependencies = [
+ "cfg-if",
+ "cipher",
+ "cpufeatures",
+]
+
 [[package]]
 name = "aho-corasick"
 version = "1.1.2"
@@ -365,11 +376,11 @@ dependencies = [
 
 [[package]]
 name = "backon"
-version = "0.4.1"
+version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9"
+checksum = "c491fa80d69c03084223a4e73c378dd9f9a1e612eb54051213f88b2d5249b458"
 dependencies = [
- "fastrand 1.9.0",
+ "fastrand 2.0.1",
  "futures-core",
  "pin-project",
  "tokio",
@@ -457,6 +468,15 @@ dependencies = [
  "generic-array",
 ]
 
+[[package]]
+name = "block-padding"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93"
+dependencies = [
+ "generic-array",
+]
+
 [[package]]
 name = "blocking"
 version = "1.5.1"
@@ -550,6 +570,15 @@ dependencies = [
  "serde_json",
 ]
 
+[[package]]
+name = "cbc"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6"
+dependencies = [
+ "cipher",
+]
+
 [[package]]
 name = "cc"
 version = "1.0.83"
@@ -589,6 +618,16 @@ dependencies = [
  "windows-targets 0.52.0",
 ]
 
+[[package]]
+name = "cipher"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
+dependencies = [
+ "crypto-common",
+ "inout",
+]
+
 [[package]]
 name = "clang-sys"
 version = "1.7.0"
@@ -1423,6 +1462,16 @@ dependencies = [
  "hashbrown 0.14.3",
 ]
 
+[[package]]
+name = "inout"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
+dependencies = [
+ "block-padding",
+ "generic-array",
+]
+
 [[package]]
 name = "instant"
 version = "0.1.12"
@@ -1993,6 +2042,16 @@ dependencies = [
  "windows-targets 0.48.5",
 ]
 
+[[package]]
+name = "pbkdf2"
+version = "0.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2"
+dependencies = [
+ "digest",
+ "hmac",
+]
+
 [[package]]
 name = "peeking_take_while"
 version = "0.1.2"
@@ -2088,6 +2147,21 @@ dependencies = [
  "spki",
 ]
 
+[[package]]
+name = "pkcs5"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6"
+dependencies = [
+ "aes",
+ "cbc",
+ "der",
+ "pbkdf2",
+ "scrypt",
+ "sha2",
+ "spki",
+]
+
 [[package]]
 name = "pkcs8"
 version = "0.10.2"
@@ -2095,6 +2169,8 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
 dependencies = [
  "der",
+ "pkcs5",
+ "rand_core",
  "spki",
 ]
 
@@ -2446,9 +2522,9 @@ checksum = 
"c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
 [[package]]
 name = "reqsign"
-version = "0.14.7"
+version = "0.14.9"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ed08ac3aa0676637644b1b892202f1ae789c28c15ebfa906128d111ae8086062"
+checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5"
 dependencies = [
  "anyhow",
  "async-trait",
@@ -2556,6 +2632,7 @@ dependencies = [
  "pkcs1",
  "pkcs8",
  "rand_core",
+ "sha2",
  "signature",
  "spki",
  "subtle",
@@ -2675,6 +2752,15 @@ version = "1.0.16"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
 
+[[package]]
+name = "salsa20"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213"
+dependencies = [
+ "cipher",
+]
+
 [[package]]
 name = "same-file"
 version = "1.0.6"
@@ -2699,6 +2785,17 @@ version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
 
+[[package]]
+name = "scrypt"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f"
+dependencies = [
+ "pbkdf2",
+ "salsa20",
+ "sha2",
+]
+
 [[package]]
 name = "sct"
 version = "0.7.1"
diff --git a/bindings/c/src/reader.rs b/bindings/c/src/reader.rs
index d0604aeebb..8f233dd8e4 100644
--- a/bindings/c/src/reader.rs
+++ b/bindings/c/src/reader.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::io::Read;
-
 use ::opendal as core;
 
 use super::*;
@@ -51,12 +49,15 @@ impl opendal_reader {
         let buf = unsafe { std::slice::from_raw_parts_mut(buf, len) };
 
         let inner = unsafe { &mut *(*reader).inner };
-        let r = inner.read(buf);
+        let r = inner.read(buf.len());
         match r {
-            Ok(n) => opendal_result_reader_read {
-                size: n,
-                error: std::ptr::null_mut(),
-            },
+            Ok(bs) => {
+                buf[..bs.len()].copy_from_slice(&bs);
+                opendal_result_reader_read {
+                    size: bs.len(),
+                    error: std::ptr::null_mut(),
+                }
+            }
             Err(e) => opendal_result_reader_read {
                 size: 0,
                 error: opendal_error::new(
diff --git a/bindings/cpp/src/reader.rs b/bindings/cpp/src/reader.rs
index 6b56d1a4b5..6f9bfd8fe9 100644
--- a/bindings/cpp/src/reader.rs
+++ b/bindings/cpp/src/reader.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use anyhow::Result;
-use od::raw::oio::BlockingRead;
 use opendal as od;
 
 use super::ffi;
@@ -25,7 +24,9 @@ pub struct Reader(pub od::BlockingReader);
 
 impl Reader {
     pub fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        Ok(self.0.read(buf)?)
+        let bs = self.0.read(buf.len())?;
+        buf[..bs.len()].copy_from_slice(&bs);
+        Ok(bs.len())
     }
 
     pub fn seek(&mut self, offset: u64, dir: ffi::SeekFrom) -> Result<u64> {
diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index 4a0a9e188d..fec882b879 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -26,7 +26,6 @@ use std::time::Duration;
 
 use futures::TryStreamExt;
 use napi::bindgen_prelude::*;
-use opendal::raw::oio::BlockingRead;
 
 #[napi]
 pub struct Operator(opendal::Operator);
@@ -648,7 +647,10 @@ pub struct BlockingReader(opendal::BlockingReader);
 impl BlockingReader {
     #[napi]
     pub fn read(&mut self, mut buf: Buffer) -> Result<usize> {
-        self.0.read(buf.as_mut()).map_err(format_napi_error)
+        let buf = buf.as_mut();
+        let bs = self.0.read(buf.len()).map_err(format_napi_error)?;
+        buf[..bs.len()].copy_from_slice(&bs);
+        Ok(bs.len())
     }
 }
 
diff --git a/bindings/ocaml/src/operator/reader.rs 
b/bindings/ocaml/src/operator/reader.rs
index b3bc7bc587..8ae7b4583b 100644
--- a/bindings/ocaml/src/operator/reader.rs
+++ b/bindings/ocaml/src/operator/reader.rs
@@ -17,14 +17,14 @@
 
 use std::io;
 
-use opendal::raw::oio::BlockingRead;
-
 use super::*;
 
 #[ocaml::func]
 #[ocaml::sig("reader -> bytes -> (int, string) Result.t ")]
 pub fn reader_read(reader: &mut Reader, buf: &mut [u8]) -> Result<usize, 
String> {
-    map_res_error(reader.0.read(buf))
+    let bs = map_res_error(reader.0.read(buf.len()))?;
+    buf[..bs.len()].copy_from_slice(&bs);
+    Ok(bs.len())
 }
 
 #[ocaml::func]
diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs
index 806306ca5a..f8a98c8f51 100644
--- a/bindings/python/src/file.rs
+++ b/bindings/python/src/file.rs
@@ -18,7 +18,6 @@
 // Remove this `allow` after 
<https://github.com/rust-lang/rust-clippy/issues/12039> fixed.
 #![allow(clippy::unnecessary_fallible_conversions)]
 
-use std::io::Read;
 use std::io::Seek;
 use std::io::SeekFrom;
 use std::io::Write;
@@ -77,11 +76,10 @@ impl File {
 
         let buffer = match size {
             Some(size) => {
-                let mut buffer = vec![0; size];
-                reader
-                    .read_exact(&mut buffer)
+                let bs = reader
+                    .read_exact(size)
                     .map_err(|err| PyIOError::new_err(err.to_string()))?;
-                buffer
+                bs.to_vec()
             }
             None => {
                 let mut buffer = Vec::new();
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 3476dc6e90..84a0c12292 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -17,7 +17,7 @@
 
 use async_trait::async_trait;
 use bytes;
-use bytes::{BufMut, Bytes};
+use bytes::Bytes;
 use futures::future::poll_fn;
 use tokio::runtime::Handle;
 
@@ -288,25 +288,13 @@ impl<I> BlockingWrapper<I> {
 }
 
 impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
-    fn read(&mut self, mut buf: &mut [u8]) -> Result<usize> {
-        let bs = self.handle.block_on(self.inner.read(buf.len()));
-        let bs = bs?;
-        buf.put_slice(&bs);
-        Ok(bs.len())
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        self.handle.block_on(self.inner.read(limit))
     }
 
     fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64> {
         self.handle.block_on(self.inner.seek(pos))
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        let bs = self.handle.block_on(self.inner.read(4 * 1024 * 1024));
-        match bs {
-            Ok(bs) if bs.is_empty() => None,
-            Ok(bs) => Some(Ok(bs)),
-            Err(err) => Some(Err(err)),
-        }
-    }
 }
 
 impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
diff --git a/core/src/layers/chaos.rs b/core/src/layers/chaos.rs
index 079500c444..55a28a554a 100644
--- a/core/src/layers/chaos.rs
+++ b/core/src/layers/chaos.rs
@@ -192,9 +192,9 @@ impl<R: oio::Read> oio::Read for ChaosReader<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for ChaosReader<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         if self.i_feel_lucky() {
-            self.inner.read(buf)
+            self.inner.read(limit)
         } else {
             Err(Self::unexpected_eof())
         }
@@ -207,12 +207,4 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
ChaosReader<R> {
             Err(Self::unexpected_eof())
         }
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        if self.i_feel_lucky() {
-            self.inner.next()
-        } else {
-            Some(Err(Self::unexpected_eof()))
-        }
-    }
 }
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 36fbcb92e7..8a663cfc47 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -267,17 +267,13 @@ impl<R: oio::Read> oio::Read for 
ConcurrentLimitWrapper<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        self.inner.read(buf)
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        self.inner.read(limit)
     }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
         self.inner.seek(pos)
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        self.inner.next()
-    }
 }
 
 impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs
index 4e488dd499..4c4af268a2 100644
--- a/core/src/layers/dtrace.rs
+++ b/core/src/layers/dtrace.rs
@@ -376,14 +376,14 @@ impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         let c_path = CString::new(self.path.clone()).unwrap();
         probe_lazy!(opendal, blocking_reader_read_start, c_path.as_ptr());
         self.inner
-            .read(buf)
-            .map(|n| {
-                probe_lazy!(opendal, blocking_reader_read_ok, c_path.as_ptr(), 
n);
-                n
+            .read(limit)
+            .map(|bs| {
+                probe_lazy!(opendal, blocking_reader_read_ok, c_path.as_ptr(), 
bs.len());
+                bs
             })
             .map_err(|e| {
                 probe_lazy!(opendal, blocking_reader_read_error, 
c_path.as_ptr());
@@ -405,26 +405,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
DtraceLayerWrapper<R> {
                 e
             })
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        let c_path = CString::new(self.path.clone()).unwrap();
-        probe_lazy!(opendal, blocking_reader_next_start, c_path.as_ptr());
-        self.inner.next().map(|res| match res {
-            Ok(bytes) => {
-                probe_lazy!(
-                    opendal,
-                    blocking_reader_next_ok,
-                    c_path.as_ptr(),
-                    bytes.len()
-                );
-                Ok(bytes)
-            }
-            Err(e) => {
-                probe_lazy!(opendal, blocking_reader_next_error, 
c_path.as_ptr());
-                Err(e)
-            }
-        })
-    }
 }
 
 impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 43b08ea540..f8112951a1 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -367,12 +367,12 @@ impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
 }
 
 impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        self.inner.read(buf).map_err(|err| {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        self.inner.read(limit).map_err(|err| {
             err.with_operation(ReadOperation::BlockingRead)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
-                .with_context("read_buf", buf.len().to_string())
+                .with_context("limit", limit.to_string())
         })
     }
 
@@ -384,16 +384,6 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
                 .with_context("seek", format!("{pos:?}"))
         })
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        self.inner.next().map(|v| {
-            v.map_err(|err| {
-                err.with_operation(ReadOperation::BlockingNext)
-                    .with_context("service", self.scheme)
-                    .with_context("path", &self.path)
-            })
-        })
-    }
 }
 
 #[async_trait::async_trait]
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 2f9fed02b1..02abb3f3df 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1057,10 +1057,10 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        match self.inner.read(buf) {
-            Ok(n) => {
-                self.read += n as u64;
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        match self.inner.read(limit) {
+            Ok(bs) => {
+                self.read += bs.len() as u64;
                 trace!(
                     target: LOGGING_TARGET,
                     "service={} operation={} path={} read={} -> data read {}B",
@@ -1068,9 +1068,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
LoggingReader<R> {
                     ReadOperation::BlockingRead,
                     self.path,
                     self.read,
-                    n
+                    bs.len()
                 );
-                Ok(n)
+                Ok(bs)
             }
             Err(err) => {
                 if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1121,40 +1121,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
LoggingReader<R> {
             }
         }
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        match self.inner.next() {
-            Some(Ok(bs)) => {
-                self.read += bs.len() as u64;
-                trace!(
-                    target: LOGGING_TARGET,
-                    "service={} operation={} path={} read={} -> data read {}B",
-                    self.ctx.scheme,
-                    ReadOperation::BlockingNext,
-                    self.path,
-                    self.read,
-                    bs.len()
-                );
-                Some(Ok(bs))
-            }
-            Some(Err(err)) => {
-                if let Some(lvl) = self.ctx.error_level(&err) {
-                    log!(
-                        target: LOGGING_TARGET,
-                        lvl,
-                        "service={} operation={} path={} read={} -> data read 
failed: {}",
-                        self.ctx.scheme,
-                        ReadOperation::BlockingNext,
-                        self.path,
-                        self.read,
-                        self.ctx.error_print(&err),
-                    )
-                }
-                Some(Err(err))
-            }
-            None => None,
-        }
-    }
 }
 
 pub struct LoggingWriter<W> {
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index c6001b8678..43b24f803d 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -798,12 +798,12 @@ impl<R: oio::Read> oio::Read for MetricWrapper<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         self.inner
-            .read(buf)
-            .map(|n| {
-                self.bytes += n as u64;
-                n
+            .read(limit)
+            .map(|bs| {
+                self.bytes += bs.len() as u64;
+                bs
             })
             .map_err(|e| {
                 self.handle.increment_errors_total(self.op, e.kind());
@@ -817,19 +817,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MetricWrapper<R> {
             err
         })
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        self.inner.next().map(|res| match res {
-            Ok(bytes) => {
-                self.bytes += bytes.len() as u64;
-                Ok(bytes)
-            }
-            Err(e) => {
-                self.handle.increment_errors_total(self.op, e.kind());
-                Err(e)
-            }
-        })
-    }
 }
 
 impl<R: oio::Write> oio::Write for MetricWrapper<R> {
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index fbf4b2f064..fdc6a1dc92 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -309,10 +309,10 @@ impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         let _g = self.span.set_local_parent();
         let _span = 
LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static());
-        self.inner.read(buf)
+        self.inner.read(limit)
     }
 
     fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
@@ -320,12 +320,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
MinitraceWrapper<R> {
         let _span = 
LocalSpan::enter_with_local_parent(ReadOperation::BlockingSeek.into_static());
         self.inner.seek(pos)
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        let _g = self.span.set_local_parent();
-        let _span = 
LocalSpan::enter_with_local_parent(ReadOperation::BlockingNext.into_static());
-        self.inner.next()
-    }
 }
 
 impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index f8f85be255..90e8d65392 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -288,17 +288,13 @@ impl<R: oio::Read> oio::Read for OtelTraceWrapper<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        self.inner.read(buf)
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        self.inner.read(limit)
     }
 
     fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
         self.inner.seek(pos)
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        self.inner.next()
-    }
 }
 
 impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 0037217d99..a5179908eb 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -719,20 +719,20 @@ impl<R: oio::Read> oio::Read for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         let labels = self.stats.generate_metric_label(
             self.scheme.into_static(),
             Operation::BlockingRead.into_static(),
             &self.path,
         );
         self.inner
-            .read(buf)
-            .map(|n| {
+            .read(limit)
+            .map(|bs| {
                 self.stats
                     .bytes_total
                     .with_label_values(&labels)
-                    .observe(n as f64);
-                n
+                    .observe(bs.len() as f64);
+                bs
             })
             .map_err(|e| {
                 self.stats.increment_errors_total(self.op, e.kind());
@@ -746,27 +746,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
             err
         })
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        let labels = self.stats.generate_metric_label(
-            self.scheme.into_static(),
-            Operation::BlockingRead.into_static(),
-            &self.path,
-        );
-        self.inner.next().map(|res| match res {
-            Ok(bytes) => {
-                self.stats
-                    .bytes_total
-                    .with_label_values(&labels)
-                    .observe(bytes.len() as f64);
-                Ok(bytes)
-            }
-            Err(e) => {
-                self.stats.increment_errors_total(self.op, e.kind());
-                Err(e)
-            }
-        })
-    }
 }
 
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
diff --git a/core/src/layers/prometheus_client.rs 
b/core/src/layers/prometheus_client.rs
index 7c12f3ecfd..95e9bd1631 100644
--- a/core/src/layers/prometheus_client.rs
+++ b/core/src/layers/prometheus_client.rs
@@ -566,12 +566,12 @@ impl<R: oio::Read> oio::Read for 
PrometheusMetricWrapper<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         self.inner
-            .read(buf)
-            .map(|n| {
-                self.bytes_total += n;
-                n
+            .read(limit)
+            .map(|bs| {
+                self.bytes_total += bs.len();
+                bs
             })
             .map_err(|e| {
                 self.metrics
@@ -587,20 +587,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
             err
         })
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        self.inner.next().map(|res| match res {
-            Ok(bytes) => {
-                self.bytes_total += bytes.len();
-                Ok(bytes)
-            }
-            Err(e) => {
-                self.metrics
-                    .increment_errors_total(self.scheme, self.op, e.kind());
-                Err(e)
-            }
-        })
-    }
 }
 
 impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 5c4fce8dcd..e6582114e8 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -737,8 +737,8 @@ impl<R: oio::Read, I: RetryInterceptor> oio::Read for 
RetryWrapper<R, I> {
 }
 
 impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for 
RetryWrapper<R, I> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        { || self.inner.as_mut().unwrap().read(buf) }
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        { || self.inner.as_mut().unwrap().read(limit) }
             .retry(&self.builder)
             .when(|e| e.is_temporary())
             .notify(|err, dur| {
@@ -772,25 +772,6 @@ impl<R: oio::BlockingRead, I: RetryInterceptor> 
oio::BlockingRead for RetryWrapp
             .call()
             .map_err(|e| e.set_persistent())
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        { || self.inner.as_mut().unwrap().next().transpose() }
-            .retry(&self.builder)
-            .when(|e| e.is_temporary())
-            .notify(|err, dur| {
-                self.notify.intercept(
-                    err,
-                    dur,
-                    &[
-                        ("operation", 
ReadOperation::BlockingNext.into_static()),
-                        ("path", &self.path),
-                    ],
-                );
-            })
-            .call()
-            .map_err(|e| e.set_persistent())
-            .transpose()
-    }
 }
 
 impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 3e42839f46..0082a62735 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -196,18 +196,14 @@ impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         // TODO: How can we handle buffer reads with a limiter?
-        self.inner.read(buf)
+        self.inner.read(limit)
     }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
         self.inner.seek(pos)
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        self.inner.next()
-    }
 }
 
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 6dfd34138d..017264cd32 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -290,8 +290,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
TracingWrapper<R> {
         parent = &self.span,
         level = "trace",
         skip_all)]
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        self.inner.read(buf)
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        self.inner.read(limit)
     }
 
     #[tracing::instrument(
@@ -301,14 +301,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
TracingWrapper<R> {
     fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
         self.inner.seek(pos)
     }
-
-    #[tracing::instrument(
-        parent = &self.span,
-        level = "trace",
-        skip_all)]
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        self.inner.next()
-    }
 }
 
 impl<R: oio::Write> oio::Write for TracingWrapper<R> {
diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs
index e2d2a10dff..430e513774 100644
--- a/core/src/raw/enum_utils.rs
+++ b/core/src/raw/enum_utils.rs
@@ -74,10 +74,10 @@ impl<ONE: oio::Read, TWO: oio::Read> oio::Read for 
TwoWays<ONE, TWO> {
 }
 
 impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead> oio::BlockingRead for 
TwoWays<ONE, TWO> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         match self {
-            Self::One(v) => v.read(buf),
-            Self::Two(v) => v.read(buf),
+            Self::One(v) => v.read(limit),
+            Self::Two(v) => v.read(limit),
         }
     }
 
@@ -87,13 +87,6 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead> 
oio::BlockingRead for TwoWa
             Self::Two(v) => v.seek(pos),
         }
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        match self {
-            Self::One(v) => v.next(),
-            Self::Two(v) => v.next(),
-        }
-    }
 }
 
 impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWays<ONE, TWO> {
@@ -152,11 +145,11 @@ impl<ONE: oio::Read, TWO: oio::Read, THREE: oio::Read> 
oio::Read for ThreeWays<O
 impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead, THREE: oio::BlockingRead> 
oio::BlockingRead
     for ThreeWays<ONE, TWO, THREE>
 {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         match self {
-            Self::One(v) => v.read(buf),
-            Self::Two(v) => v.read(buf),
-            Self::Three(v) => v.read(buf),
+            Self::One(v) => v.read(limit),
+            Self::Two(v) => v.read(limit),
+            Self::Three(v) => v.read(limit),
         }
     }
 
@@ -167,14 +160,6 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead, 
THREE: oio::BlockingRead> o
             Self::Three(v) => v.seek(pos),
         }
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        match self {
-            Self::One(v) => v.next(),
-            Self::Two(v) => v.next(),
-            Self::Three(v) => v.next(),
-        }
-    }
 }
 
 impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
@@ -252,12 +237,12 @@ where
     THREE: oio::BlockingRead,
     FOUR: oio::BlockingRead,
 {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         match self {
-            Self::One(v) => v.read(buf),
-            Self::Two(v) => v.read(buf),
-            Self::Three(v) => v.read(buf),
-            Self::Four(v) => v.read(buf),
+            Self::One(v) => v.read(limit),
+            Self::Two(v) => v.read(limit),
+            Self::Three(v) => v.read(limit),
+            Self::Four(v) => v.read(limit),
         }
     }
 
@@ -269,15 +254,6 @@ where
             Self::Four(v) => v.seek(pos),
         }
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        match self {
-            Self::One(v) => v.next(),
-            Self::Two(v) => v.next(),
-            Self::Three(v) => v.next(),
-            Self::Four(v) => v.next(),
-        }
-    }
 }
 
 impl<ONE, TWO, THREE, FOUR> oio::List for FourWays<ONE, TWO, THREE, FOUR>
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index a920652451..75a034f102 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use std::cmp::min;
-use std::io::Read;
 use std::io::SeekFrom;
 use std::task::Context;
 use std::task::Poll;
@@ -72,6 +71,18 @@ impl From<Vec<u8>> for Cursor {
 }
 
 impl oio::Read for Cursor {
+    async fn read(&mut self, limit: usize) -> Result<Bytes> {
+        if self.is_empty() {
+            Ok(Bytes::new())
+        } else {
+            // The clone here is required as we don't want to change it.
+            let mut bs = self.inner.clone().split_off(self.pos as usize);
+            let bs = bs.split_to(min(bs.len(), limit));
+            self.pos += bs.len() as u64;
+            Ok(bs)
+        }
+    }
+
     async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
         let (base, amt) = match pos {
             SeekFrom::Start(n) => (0, n as i64),
@@ -91,8 +102,10 @@ impl oio::Read for Cursor {
         self.pos = n;
         Ok(n)
     }
+}
 
-    async fn read(&mut self, limit: usize) -> Result<Bytes> {
+impl oio::BlockingRead for Cursor {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         if self.is_empty() {
             Ok(Bytes::new())
         } else {
@@ -103,18 +116,6 @@ impl oio::Read for Cursor {
             Ok(bs)
         }
     }
-}
-
-impl oio::BlockingRead for Cursor {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        let n = Read::read(&mut self.remaining_slice(), buf).map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "read data from Cursor")
-                .with_context("source", "Cursor")
-                .set_source(err)
-        })?;
-        self.pos += n as u64;
-        Ok(n)
-    }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
         let (base, amt) = match pos {
@@ -135,17 +136,6 @@ impl oio::BlockingRead for Cursor {
         self.pos = n;
         Ok(n)
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        if self.is_empty() {
-            None
-        } else {
-            // The clone here is required as we don't want to change it.
-            let bs = self.inner.clone().split_off(self.pos as usize);
-            self.pos += bs.len() as u64;
-            Some(Ok(bs))
-        }
-    }
 }
 
 impl oio::Stream for Cursor {
diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs
index 22865375ab..d8ed05c4ec 100644
--- a/core/src/raw/oio/read/api.rs
+++ b/core/src/raw/oio/read/api.rs
@@ -22,7 +22,6 @@ use std::ops::DerefMut;
 
 use bytes::Bytes;
 use futures::Future;
-use tokio::io::ReadBuf;
 
 use crate::raw::BoxedFuture;
 use crate::*;
@@ -177,65 +176,15 @@ pub type BlockingReader = Box<dyn BlockingRead>;
 /// is optional. We use `Read` to make users life easier.
 pub trait BlockingRead: Send + Sync {
     /// Read synchronously.
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize>;
+    fn read(&mut self, limit: usize) -> Result<Bytes>;
 
     /// Seek synchronously.
     fn seek(&mut self, pos: io::SeekFrom) -> Result<u64>;
-
-    /// Iterating [`Bytes`] from underlying reader.
-    fn next(&mut self) -> Option<Result<Bytes>>;
-
-    /// Read all data of current reader to the end of buf.
-    fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
-        let start_len = buf.len();
-        let start_cap = buf.capacity();
-
-        loop {
-            if buf.len() == buf.capacity() {
-                buf.reserve(32); // buf is full, need more space
-            }
-
-            let spare = buf.spare_capacity_mut();
-            let mut read_buf: ReadBuf = ReadBuf::uninit(spare);
-
-            // SAFETY: These bytes were initialized but not filled in the 
previous loop
-            unsafe {
-                read_buf.assume_init(read_buf.capacity());
-            }
-
-            match self.read(read_buf.initialize_unfilled()) {
-                Ok(0) => return Ok(buf.len() - start_len),
-                Ok(n) => {
-                    // SAFETY: Read API makes sure that returning `n` is 
correct.
-                    unsafe {
-                        buf.set_len(buf.len() + n);
-                    }
-                }
-                Err(e) => return Err(e),
-            }
-
-            // The buffer might be an exact fit. Let's read into a probe buffer
-            // and see if it returns `Ok(0)`. If so, we've avoided an
-            // unnecessary doubling of the capacity. But if not, append the
-            // probe buffer to the primary buffer and let its capacity grow.
-            if buf.len() == buf.capacity() && buf.capacity() == start_cap {
-                let mut probe = [0u8; 32];
-
-                match self.read(&mut probe) {
-                    Ok(0) => return Ok(buf.len() - start_len),
-                    Ok(n) => {
-                        buf.extend_from_slice(&probe[..n]);
-                    }
-                    Err(e) => return Err(e),
-                }
-            }
-        }
-    }
 }
 
 impl BlockingRead for () {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        let _ = buf;
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        let _ = limit;
 
         unimplemented!("read is required to be implemented for 
oio::BlockingRead")
     }
@@ -248,27 +197,16 @@ impl BlockingRead for () {
             "output blocking reader doesn't support seeking",
         ))
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        Some(Err(Error::new(
-            ErrorKind::Unsupported,
-            "output reader doesn't support iterating",
-        )))
-    }
 }
 
 /// `Box<dyn BlockingRead>` won't implement `BlockingRead` automatically.
 /// To make BlockingReader work as expected, we must add this impl.
 impl<T: BlockingRead + ?Sized> BlockingRead for Box<T> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        (**self).read(buf)
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        (**self).read(limit)
     }
 
     fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
         (**self).seek(pos)
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        (**self).next()
-    }
 }
diff --git a/core/src/raw/oio/read/buffer_reader.rs 
b/core/src/raw/oio/read/buffer_reader.rs
index 33709b2355..733878d1cc 100644
--- a/core/src/raw/oio/read/buffer_reader.rs
+++ b/core/src/raw/oio/read/buffer_reader.rs
@@ -18,7 +18,6 @@
 use std::cmp::min;
 use std::io::SeekFrom;
 
-use bytes::BufMut;
 use bytes::Bytes;
 use tokio::io::ReadBuf;
 
@@ -136,30 +135,6 @@ impl<R> oio::Read for BufferReader<R>
 where
     R: oio::Read,
 {
-    async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
-        match pos {
-            SeekFrom::Start(new_pos) => {
-                // TODO(weny): Check the overflowing.
-                let Some(offset) = (new_pos as i64).checked_sub(self.cur as 
i64) else {
-                    return self.inner_seek(pos).await;
-                };
-
-                match self.seek_relative(offset) {
-                    Some(cur) => Ok(cur),
-                    None => self.inner_seek(pos).await,
-                }
-            }
-            SeekFrom::Current(offset) => match self.seek_relative(offset) {
-                Some(cur) => Ok(cur),
-                None => {
-                    self.inner_seek(SeekFrom::Current(offset - 
self.unconsumed_buffer_len()))
-                        .await
-                }
-            },
-            SeekFrom::End(_) => self.inner_seek(pos).await,
-        }
-    }
-
     async fn read(&mut self, limit: usize) -> Result<Bytes> {
         if limit == 0 {
             return Ok(Bytes::new());
@@ -190,6 +165,30 @@ where
         self.consume(bytes.len());
         Ok(bytes)
     }
+
+    async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
+        match pos {
+            SeekFrom::Start(new_pos) => {
+                // TODO(weny): Check the overflowing.
+                let Some(offset) = (new_pos as i64).checked_sub(self.cur as 
i64) else {
+                    return self.inner_seek(pos).await;
+                };
+
+                match self.seek_relative(offset) {
+                    Some(cur) => Ok(cur),
+                    None => self.inner_seek(pos).await,
+                }
+            }
+            SeekFrom::Current(offset) => match self.seek_relative(offset) {
+                Some(cur) => Ok(cur),
+                None => {
+                    self.inner_seek(SeekFrom::Current(offset - 
self.unconsumed_buffer_len()))
+                        .await
+                }
+            },
+            SeekFrom::End(_) => self.inner_seek(pos).await,
+        }
+    }
 }
 
 impl<R> BufferReader<R>
@@ -210,11 +209,12 @@ where
             let mut buf = ReadBuf::uninit(dst);
             unsafe { buf.assume_init(cap) };
 
-            let n = self.r.read(buf.initialized_mut())?;
-            unsafe { self.buf.set_len(n) }
+            let bs = self.r.read(cap)?;
+            buf.put_slice(&bs);
+            unsafe { self.buf.set_len(bs.len()) }
 
             self.pos = 0;
-            self.filled = n;
+            self.filled = bs.len();
         }
 
         Ok(&self.buf[self.pos..self.filled])
@@ -233,32 +233,35 @@ impl<R> BlockingRead for BufferReader<R>
 where
     R: BlockingRead,
 {
-    fn read(&mut self, mut dst: &mut [u8]) -> Result<usize> {
-        // Sanity check for normal cases.
-        if dst.is_empty() {
-            return Ok(0);
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        if limit == 0 {
+            return Ok(Bytes::new());
         }
 
         // If we don't have any buffered data and we're doing a massive read
         // (larger than our internal buffer), bypass our internal buffer
         // entirely.
-        if self.pos == self.filled && dst.len() >= self.capacity() {
-            let res = self.r.read(dst);
+        if self.pos == self.filled && limit >= self.capacity() {
+            let res = self.r.read(limit);
             self.discard_buffer();
             return match res {
-                Ok(nread) => {
-                    self.cur += nread as u64;
-                    Ok(nread)
+                Ok(bs) => {
+                    self.cur += bs.len() as u64;
+                    Ok(bs)
                 }
                 Err(err) => Err(err),
             };
         }
 
-        let rem = self.blocking_fill_buf()?;
-        let amt = min(rem.len(), dst.len());
-        dst.put(&rem[..amt]);
-        self.consume(amt);
-        Ok(amt)
+        let bytes = self.blocking_fill_buf()?;
+
+        if bytes.is_empty() {
+            return Ok(Bytes::new());
+        }
+        let size = min(bytes.len(), limit);
+        let bytes = Bytes::copy_from_slice(&bytes[..size]);
+        self.consume(bytes.len());
+        Ok(bytes)
     }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
@@ -282,21 +285,6 @@ where
             SeekFrom::End(_) => self.blocking_inner_seek(pos),
         }
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        match self.blocking_fill_buf() {
-            Ok(bytes) => {
-                if bytes.is_empty() {
-                    return None;
-                }
-
-                let bytes = Bytes::copy_from_slice(bytes);
-                self.consume(bytes.len());
-                Some(Ok(bytes))
-            }
-            Err(err) => Some(Err(err)),
-        }
-    }
 }
 
 #[cfg(test)]
@@ -397,8 +385,8 @@ mod tests {
     }
 
     impl BlockingRead for MockReader {
-        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-            self.inner.read(buf)
+        fn read(&mut self, limit: usize) -> Result<Bytes> {
+            self.inner.read(limit)
         }
 
         fn seek(&mut self, _pos: SeekFrom) -> Result<u64> {
@@ -407,10 +395,6 @@ mod tests {
                 "output reader doesn't support seeking",
             ))
         }
-
-        fn next(&mut self) -> Option<Result<Bytes>> {
-            self.inner.next()
-        }
     }
 
     #[tokio::test]
@@ -634,20 +618,17 @@ mod tests {
         let r = Box::new(BufferReader::new(r, buf_cap)) as oio::BlockingReader;
         let mut r = BlockingReader::new(r);
 
-        let mut dst = [0u8; 5];
-        let nread = r.read(&mut dst)?;
-        assert_eq!(nread, dst.len());
-        assert_eq!(&dst, b"Hello");
+        let buf = r.read(5)?;
+        assert_eq!(buf.len(), 5);
+        assert_eq!(buf.as_ref(), b"Hello");
 
-        let mut dst = [0u8; 5];
-        let nread = r.read(&mut dst)?;
-        assert_eq!(nread, dst.len());
-        assert_eq!(&dst, b", Wor");
+        let buf = r.read(5)?;
+        assert_eq!(buf.len(), 5);
+        assert_eq!(buf.as_ref(), b", Wor");
 
-        let mut dst = [0u8; 3];
-        let nread = r.read(&mut dst)?;
-        assert_eq!(nread, dst.len());
-        assert_eq!(&dst, b"ld!");
+        let buf = r.read(3)?;
+        assert_eq!(buf.len(), 3);
+        assert_eq!(buf.as_ref(), b"ld!");
 
         Ok(())
     }
@@ -661,33 +642,29 @@ mod tests {
         let mut r = BlockingReader::new(r);
 
         // The underlying reader buffers the b"Hello, Wor".
-        let mut dst = [0u8; 5];
-        let nread = r.read(&mut dst)?;
-        assert_eq!(nread, dst.len());
-        assert_eq!(&dst, b"Hello");
+        let buf = r.read(5)?;
+        assert_eq!(buf.len(), 5);
+        assert_eq!(buf.as_ref(), b"Hello");
 
         let pos = r.seek(SeekFrom::Start(7))?;
         assert_eq!(pos, 7);
-        let mut dst = [0u8; 5];
-        let nread = r.read(&mut dst)?;
-        assert_eq!(&dst[..nread], &bs[7..10]);
-        assert_eq!(nread, 3);
+        let buf = r.read(5)?;
+        assert_eq!(&buf[..], &bs[7..10]);
+        assert_eq!(buf.len(), 3);
 
         // Should perform a relative seek.
         let pos = r.seek(SeekFrom::Start(0))?;
         assert_eq!(pos, 0);
-        let mut dst = [0u8; 9];
-        let nread = r.read(&mut dst)?;
-        assert_eq!(&dst[..nread], &bs[0..9]);
-        assert_eq!(nread, 9);
+        let buf = r.read(9)?;
+        assert_eq!(&buf[..], &bs[0..9]);
+        assert_eq!(buf.len(), 9);
 
         // Should perform a non-relative seek.
         let pos = r.seek(SeekFrom::Start(11))?;
         assert_eq!(pos, 11);
-        let mut dst = [0u8; 9];
-        let nread = r.read(&mut dst)?;
-        assert_eq!(&dst[..nread], &bs[11..13]);
-        assert_eq!(nread, 2);
+        let buf = r.read(9)?;
+        assert_eq!(&buf[..], &bs[11..13]);
+        assert_eq!(buf.len(), 2);
 
         Ok(())
     }
@@ -734,9 +711,8 @@ mod tests {
 
         let mut cur = 0;
         for _ in 0..3 {
-            let mut dst = [0u8; 5];
-            let nread = r.read(&mut dst)?;
-            assert_eq!(nread, 5);
+            let bs = r.read(5)?;
+            assert_eq!(bs.len(), 5);
             cur += 5;
         }
 
@@ -757,9 +733,8 @@ mod tests {
 
         let mut cur = 0;
         for _ in 0..3 {
-            let mut dst = [0u8; 6];
-            let nread = r.read(&mut dst)?;
-            assert_eq!(nread, 6);
+            let bs = r.read(6)?;
+            assert_eq!(bs.len(), 6);
             cur += 6;
         }
 
@@ -771,8 +746,6 @@ mod tests {
 
     #[tokio::test]
     async fn test_blocking_read_part() -> anyhow::Result<()> {
-        use std::io::Read;
-
         let (bs, _) = gen_bytes();
         let acc = Arc::new(MockReadService::new(bs.clone()));
         let r = Box::new(RangeReader::new(
@@ -784,7 +757,7 @@ mod tests {
         let mut r = BlockingReader::new(r);
 
         let mut buf = Vec::new();
-        BlockingRead::read_to_end(&mut r, &mut buf)?;
+        r.read_to_end(&mut buf)?;
         assert_eq!(4096, buf.len(), "read size");
         assert_eq!(
             format!("{:x}", Sha256::digest(&bs[4096..4096 + 4096])),
@@ -796,7 +769,7 @@ mod tests {
         assert_eq!(n, 0, "seek position must be 0");
 
         let mut buf = Vec::new();
-        BlockingRead::read_to_end(&mut r, &mut buf)?;
+        r.read_to_end(&mut buf)?;
         assert_eq!(4096, buf.len(), "read twice size");
         assert_eq!(
             format!("{:x}", Sha256::digest(&bs[4096..4096 + 4096])),
@@ -807,8 +780,7 @@ mod tests {
         let n = r.seek(SeekFrom::Start(1024))?;
         assert_eq!(1024, n, "seek to 1024");
 
-        let mut buf = vec![0; 1024];
-        r.read_exact(&mut buf)?;
+        let buf = r.read_exact(1024)?;
         assert_eq!(
             format!("{:x}", Sha256::digest(&bs[4096 + 1024..4096 + 2048])),
             format!("{:x}", Sha256::digest(&buf)),
@@ -818,8 +790,7 @@ mod tests {
         let n = r.seek(SeekFrom::Current(1024))?;
         assert_eq!(3072, n, "seek to 3072");
 
-        let mut buf = vec![0; 1024];
-        r.read_exact(&mut buf)?;
+        let buf = r.read_exact(1024)?;
         assert_eq!(
             format!("{:x}", Sha256::digest(&bs[4096 + 3072..4096 + 3072 + 
1024])),
             format!("{:x}", Sha256::digest(&buf)),
diff --git a/core/src/raw/oio/read/file_read.rs 
b/core/src/raw/oio/read/file_read.rs
index 6a27733634..64596c754a 100644
--- a/core/src/raw/oio/read/file_read.rs
+++ b/core/src/raw/oio/read/file_read.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::cmp;
 use std::io::SeekFrom;
 use std::sync::Arc;
 
@@ -40,7 +39,6 @@ pub struct FileReader<A: Accessor, R> {
     cur: u64,
 
     reader: Option<R>,
-    buf: oio::AdaptiveBuf,
     /// Do we need to reset our cursor?
     seek_dirty: bool,
 }
@@ -63,7 +61,6 @@ where
             offset: None,
             size: None,
             cur: 0,
-            buf: oio::AdaptiveBuf::default(),
             reader: None,
             seek_dirty: false,
         }
@@ -213,6 +210,24 @@ where
     A: Accessor<Reader = R>,
     R: oio::Read,
 {
+    async fn read(&mut self, limit: usize) -> Result<Bytes> {
+        if self.reader.is_none() {
+            // FileReader doesn't support range, we will always use full range 
to open a file.
+            let op = self.op.clone().with_range(BytesRange::from(..));
+            let (_, r) = self.acc.read(&self.path, op).await?;
+            self.reader = Some(r);
+        }
+
+        let r = self.reader.as_mut().expect("reader must be valid");
+
+        // We should know where to start read the data.
+        if self.offset.is_none() {
+            (self.offset, self.size) = Self::offset(r, self.op.range()).await?;
+        }
+
+        r.read(limit).await
+    }
+
     async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
         if self.reader.is_none() {
             // FileReader doesn't support range, we will always use full range 
to open a file.
@@ -245,24 +260,6 @@ where
         self.cur = pos - self.offset.unwrap();
         Ok(self.cur)
     }
-
-    async fn read(&mut self, limit: usize) -> Result<Bytes> {
-        if self.reader.is_none() {
-            // FileReader doesn't support range, we will always use full range 
to open a file.
-            let op = self.op.clone().with_range(BytesRange::from(..));
-            let (_, r) = self.acc.read(&self.path, op).await?;
-            self.reader = Some(r);
-        }
-
-        let r = self.reader.as_mut().expect("reader must be valid");
-
-        // We should know where to start read the data.
-        if self.offset.is_none() {
-            (self.offset, self.size) = Self::offset(r, self.op.range()).await?;
-        }
-
-        r.read(limit).await
-    }
 }
 
 impl<A, R> oio::BlockingRead for FileReader<A, R>
@@ -270,7 +267,7 @@ where
     A: Accessor<BlockingReader = R>,
     R: oio::BlockingRead,
 {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         if self.reader.is_none() {
             // FileReader doesn't support range, we will always use full range 
to open a file.
             let op = self.op.clone().with_range(BytesRange::from(..));
@@ -285,25 +282,7 @@ where
             (self.offset, self.size) = Self::calculate_offset(r, 
self.op.range())?;
         }
 
-        let size = if let Some(size) = self.size {
-            // Sanity check.
-            if self.cur >= size {
-                return Ok(0);
-            }
-            cmp::min(buf.len(), (size - self.cur) as usize)
-        } else {
-            buf.len()
-        };
-
-        match r.read(&mut buf[..size]) {
-            Ok(0) => Ok(0),
-            Ok(n) => {
-                self.cur += n as u64;
-                Ok(n)
-            }
-            // We don't need to reset state here since it's ok to poll the 
same reader.
-            Err(err) => Err(err),
-        }
+        r.read(limit)
     }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
@@ -337,52 +316,4 @@ where
         self.cur = pos - self.offset.unwrap();
         Ok(self.cur)
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        if self.reader.is_none() {
-            // FileReader doesn't support range, we will always use full range 
to open a file.
-            let op = self.op.clone().with_range(BytesRange::from(..));
-            let (_, r) = match self.acc.blocking_read(&self.path, op) {
-                Ok(v) => v,
-                Err(err) => return Some(Err(err)),
-            };
-            self.reader = Some(r);
-        }
-
-        let r = self.reader.as_mut().expect("reader must be valid");
-
-        // We should know where to start read the data.
-        if self.offset.is_none() {
-            (self.offset, self.size) = match Self::calculate_offset(r, 
self.op.range()) {
-                Ok(v) => v,
-                Err(err) => return Some(Err(err)),
-            }
-        }
-
-        self.buf.reserve();
-
-        let mut buf = self.buf.initialized_mut();
-        let buf = buf.initialized_mut();
-
-        let size = if let Some(size) = self.size {
-            // Sanity check.
-            if self.cur >= size {
-                return None;
-            }
-            cmp::min(buf.len(), (size - self.cur) as usize)
-        } else {
-            buf.len()
-        };
-
-        match r.read(&mut buf[..size]) {
-            Ok(0) => None,
-            Ok(n) => {
-                self.cur += n as u64;
-                self.buf.record(n);
-                Some(Ok(self.buf.split(n)))
-            }
-            // We don't need to reset state here since it's ok to poll the 
same reader.
-            Err(err) => Some(Err(err)),
-        }
-    }
 }
diff --git a/core/src/raw/oio/read/into_streamable_read.rs 
b/core/src/raw/oio/read/into_streamable_read.rs
index e234eb654c..6fe21a2c60 100644
--- a/core/src/raw/oio/read/into_streamable_read.rs
+++ b/core/src/raw/oio/read/into_streamable_read.rs
@@ -28,7 +28,6 @@ use crate::*;
 pub fn into_streamable_read<R>(r: R, capacity: usize) -> StreamableReader<R> {
     StreamableReader {
         r,
-        cap: capacity,
         buf: Vec::with_capacity(capacity),
     }
 }
@@ -36,7 +35,6 @@ pub fn into_streamable_read<R>(r: R, capacity: usize) -> 
StreamableReader<R> {
 /// Make given read streamable.
 pub struct StreamableReader<R> {
     r: R,
-    cap: usize,
     buf: Vec<u8>,
 }
 
@@ -63,28 +61,13 @@ impl<R: oio::Read> oio::Read for StreamableReader<R> {
 }
 
 impl<R: oio::BlockingRead> oio::BlockingRead for StreamableReader<R> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        self.r.read(buf)
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        self.r.read(limit)
     }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
         self.r.seek(pos)
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        let dst = self.buf.spare_capacity_mut();
-        let mut buf = ReadBuf::uninit(dst);
-        unsafe { buf.assume_init(self.cap) };
-
-        match self.r.read(buf.initialized_mut()) {
-            Err(err) => Some(Err(err)),
-            Ok(0) => None,
-            Ok(n) => {
-                buf.set_filled(n);
-                Some(Ok(Bytes::from(buf.filled().to_vec())))
-            }
-        }
-    }
 }
 
 #[cfg(test)]
@@ -135,10 +118,13 @@ mod tests {
         let r = oio::Cursor::from(content.clone());
         let mut s = into_streamable_read(Box::new(r) as oio::BlockingReader, 
cap);
 
-        let mut bs = BytesMut::new();
-        while let Some(b) = s.next() {
-            let b = b.expect("read must success");
-            bs.put_slice(&b);
+        let mut bs = BytesMut::with_capacity(size);
+        loop {
+            let buf = s.read(size).expect("read must success");
+            if buf.is_empty() {
+                break;
+            }
+            bs.put_slice(&buf)
         }
         assert_eq!(bs.freeze().to_vec(), content)
     }
diff --git a/core/src/raw/oio/read/lazy_read.rs 
b/core/src/raw/oio/read/lazy_read.rs
index 89be1a3b6f..1fd1c71b59 100644
--- a/core/src/raw/oio/read/lazy_read.rs
+++ b/core/src/raw/oio/read/lazy_read.rs
@@ -99,20 +99,11 @@ where
     A: Accessor<BlockingReader = R>,
     R: oio::BlockingRead,
 {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        self.blocking_reader()?.read(buf)
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        self.blocking_reader()?.read(limit)
     }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
         self.blocking_reader()?.seek(pos)
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        let r = match self.blocking_reader() {
-            Ok(r) => r,
-            Err(err) => return Some(Err(err)),
-        };
-
-        r.next()
-    }
 }
diff --git a/core/src/raw/oio/read/range_read.rs 
b/core/src/raw/oio/read/range_read.rs
index d0551f86d2..56289d5341 100644
--- a/core/src/raw/oio/read/range_read.rs
+++ b/core/src/raw/oio/read/range_read.rs
@@ -252,6 +252,43 @@ where
     A: Accessor<Reader = R>,
     R: oio::Read,
 {
+    async fn read(&mut self, limit: usize) -> Result<Bytes> {
+        // Sanity check for normal cases.
+        if self.cur >= self.size.unwrap_or(u64::MAX) {
+            return Ok(Bytes::new());
+        }
+
+        if self.offset.is_none() {
+            let rp = match self.stat_future().await {
+                Ok(v) => v,
+                Err(err) => return Err(err),
+            };
+            let length = rp.into_metadata().content_length();
+            self.ensure_offset(length)?
+        }
+        if self.reader.is_none() {
+            let (rp, r) = match self.read_future().await {
+                Ok((rp, r)) => (rp, r),
+                Err(err) => return Err(err),
+            };
+
+            self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());
+            self.reader = Some(r);
+        }
+
+        let r = self.reader.as_mut().expect("reader must be valid");
+        match r.read(limit).await {
+            Ok(bs) => {
+                self.cur += bs.len() as u64;
+                Ok(bs)
+            }
+            Err(err) => {
+                self.reader = None;
+                Err(err)
+            }
+        }
+    }
+
     async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
         // There is an optimization here that we can calculate if users trying 
to seek
         // the same position, for example, `reader.seek(SeekFrom::Current(0))`.
@@ -292,15 +329,21 @@ where
         self.cur = seek_pos;
         Ok(self.cur)
     }
+}
 
-    async fn read(&mut self, limit: usize) -> Result<Bytes> {
+impl<A, R> oio::BlockingRead for RangeReader<A, R>
+where
+    A: Accessor<BlockingReader = R>,
+    R: oio::BlockingRead,
+{
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
         // Sanity check for normal cases.
         if self.cur >= self.size.unwrap_or(u64::MAX) {
             return Ok(Bytes::new());
         }
 
         if self.offset.is_none() {
-            let rp = match self.stat_future().await {
+            let rp = match self.stat_action() {
                 Ok(v) => v,
                 Err(err) => return Err(err),
             };
@@ -308,7 +351,7 @@ where
             self.ensure_offset(length)?
         }
         if self.reader.is_none() {
-            let (rp, r) = match self.read_future().await {
+            let (rp, r) = match self.read_action() {
                 Ok((rp, r)) => (rp, r),
                 Err(err) => return Err(err),
             };
@@ -318,7 +361,7 @@ where
         }
 
         let r = self.reader.as_mut().expect("reader must be valid");
-        match r.read(limit).await {
+        match r.read(limit) {
             Ok(bs) => {
                 self.cur += bs.len() as u64;
                 Ok(bs)
@@ -329,48 +372,6 @@ where
             }
         }
     }
-}
-
-impl<A, R> oio::BlockingRead for RangeReader<A, R>
-where
-    A: Accessor<BlockingReader = R>,
-    R: oio::BlockingRead,
-{
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        // Sanity check for normal cases.
-        if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) {
-            return Ok(0);
-        }
-
-        if self.offset.is_none() {
-            let rp = self.stat_action()?;
-            let length = rp.into_metadata().content_length();
-            self.ensure_offset(length)?;
-        }
-        if self.reader.is_none() {
-            let (rp, r) = self.read_action()?;
-
-            self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());
-            self.reader = Some(r);
-        }
-
-        let r = self.reader.as_mut().expect("reader must be valid");
-        match r.read(buf) {
-            Ok(0) => {
-                // Reset state to Idle after all data has been consumed.
-                self.reader = None;
-                Ok(0)
-            }
-            Ok(n) => {
-                self.cur += n as u64;
-                Ok(n)
-            }
-            Err(e) => {
-                self.reader = None;
-                Err(e)
-            }
-        }
-    }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
         // There is an optimization here that we can calculate if users trying 
to seek
@@ -412,49 +413,6 @@ where
         self.cur = seek_pos;
         Ok(self.cur)
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        // Sanity check for normal cases.
-        if self.cur >= self.size.unwrap_or(u64::MAX) {
-            return None;
-        }
-
-        if self.offset.is_none() {
-            let rp = match self.stat_action() {
-                Ok(rp) => rp,
-                Err(err) => return Some(Err(err)),
-            };
-            let length = rp.into_metadata().content_length();
-            if let Err(err) = self.ensure_offset(length) {
-                return Some(Err(err));
-            }
-        }
-        if self.reader.is_none() {
-            let (rp, r) = match self.read_action() {
-                Ok((rp, r)) => (rp, r),
-                Err(err) => return Some(Err(err)),
-            };
-
-            self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());
-            self.reader = Some(r);
-        }
-
-        let r = self.reader.as_mut().expect("reader must be valid");
-        match r.next() {
-            Some(Ok(bs)) => {
-                self.cur += bs.len() as u64;
-                Some(Ok(bs))
-            }
-            Some(Err(err)) => {
-                self.reader = None;
-                Some(Err(err))
-            }
-            None => {
-                self.reader = None;
-                None
-            }
-        }
-    }
 }
 
 #[cfg(test)]
diff --git a/core/src/raw/oio/read/std_read.rs 
b/core/src/raw/oio/read/std_read.rs
index d28f8d5ca3..8726ebf249 100644
--- a/core/src/raw/oio/read/std_read.rs
+++ b/core/src/raw/oio/read/std_read.rs
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use bytes::Bytes;
 use std::io::Read;
 use std::io::Seek;
 use std::io::SeekFrom;
-
-use bytes::Bytes;
+use tokio::io::ReadBuf;
 
 use crate::raw::*;
 use crate::*;
@@ -27,12 +27,16 @@ use crate::*;
 /// FuturesReader implements [`oio::BlockingRead`] via [`Read`] + [`Seek`].
 pub struct StdReader<R: Read + Seek> {
     inner: R,
+    buf: Vec<u8>,
 }
 
 impl<R: Read + Seek> StdReader<R> {
     /// Create a new std reader.
     pub fn new(inner: R) -> Self {
-        Self { inner }
+        Self {
+            inner,
+            buf: Vec::with_capacity(64 * 1024),
+        }
     }
 }
 
@@ -40,12 +44,27 @@ impl<R> oio::BlockingRead for StdReader<R>
 where
     R: Read + Seek + Send + Sync,
 {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        self.inner.read(buf).map_err(|err| {
+    fn read(&mut self, limit: usize) -> Result<Bytes> {
+        // Make sure buf has enough space.
+        if self.buf.capacity() < limit {
+            self.buf.reserve(limit);
+        }
+        let buf = self.buf.spare_capacity_mut();
+        let mut read_buf: ReadBuf = ReadBuf::uninit(buf);
+
+        // SAFETY: Read at most `size` bytes into `read_buf`.
+        unsafe {
+            read_buf.assume_init(limit);
+        }
+
+        let n = self.inner.read(read_buf.initialized_mut()).map_err(|err| {
             new_std_io_error(err)
-                .with_operation(oio::ReadOperation::BlockingRead)
-                .with_context("source", "StdReader")
-        })
+                .with_operation(oio::ReadOperation::Read)
+                .with_context("source", "TokioReader")
+        })?;
+        read_buf.set_filled(n);
+
+        Ok(Bytes::copy_from_slice(read_buf.filled()))
     }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
@@ -55,11 +74,4 @@ where
                 .with_context("source", "StdReader")
         })
     }
-
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        Some(Err(Error::new(
-            ErrorKind::Unsupported,
-            "StdReader doesn't support poll_next",
-        )))
-    }
 }
diff --git a/core/src/raw/oio/read/tokio_read.rs 
b/core/src/raw/oio/read/tokio_read.rs
index 6dea5cec39..f89e2ec5f3 100644
--- a/core/src/raw/oio/read/tokio_read.rs
+++ b/core/src/raw/oio/read/tokio_read.rs
@@ -47,14 +47,6 @@ impl<R> oio::Read for TokioReader<R>
 where
     R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
 {
-    async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
-        self.inner.seek(pos).await.map_err(|err| {
-            new_std_io_error(err)
-                .with_operation(oio::ReadOperation::Seek)
-                .with_context("source", "TokioReader")
-        })
-    }
-
     async fn read(&mut self, limit: usize) -> Result<Bytes> {
         // Make sure buf has enough space.
         if self.buf.capacity() < limit {
@@ -81,4 +73,12 @@ where
 
         Ok(Bytes::copy_from_slice(read_buf.filled()))
     }
+
+    async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
+        self.inner.seek(pos).await.map_err(|err| {
+            new_std_io_error(err)
+                .with_operation(oio::ReadOperation::Seek)
+                .with_context("source", "TokioReader")
+        })
+    }
 }
diff --git a/core/src/raw/tests/read.rs b/core/src/raw/tests/read.rs
index 49b44b3682..b2503901a3 100644
--- a/core/src/raw/tests/read.rs
+++ b/core/src/raw/tests/read.rs
@@ -179,16 +179,11 @@ impl ReadChecker {
         for action in actions {
             match action {
                 ReadAction::Read(size) => {
-                    use oio::BlockingRead;
-
-                    let mut buf = vec![0; *size];
-                    let n = r.read(&mut buf).expect("read must success");
-                    self.check_read(*size, &buf[..n]);
+                    let bs = r.read(*size).expect("read must success");
+                    self.check_read(*size, &bs);
                 }
 
                 ReadAction::Seek(pos) => {
-                    use oio::BlockingRead;
-
                     let res = r.seek(*pos);
                     self.check_seek(*pos, res);
                 }
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index 9cb46efa30..662f3d770a 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -18,7 +18,6 @@
 use bytes::Bytes;
 
 use super::operator_functions::*;
-use crate::raw::oio::BlockingRead;
 use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
@@ -410,9 +409,10 @@ impl BlockingOperator {
                     (range.size().unwrap(), range)
                 };
 
-                let (_, mut s) = inner.blocking_read(&path, 
args.with_range(range))?;
+                let (_, r) = inner.blocking_read(&path, 
args.with_range(range))?;
+                let mut r = BlockingReader::new(r);
                 let mut buf = Vec::with_capacity(size_hint as usize);
-                s.read_to_end(&mut buf)?;
+                r.read_to_end(&mut buf)?;
 
                 Ok(buf)
             },
diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs
index 2af5ca5144..8db4e9f9f4 100644
--- a/core/src/types/reader.rs
+++ b/core/src/types/reader.rs
@@ -26,6 +26,7 @@ use bytes::{BufMut, Bytes, BytesMut};
 use futures::Stream;
 use tokio::io::ReadBuf;
 
+use crate::raw::oio::BlockingRead;
 use crate::raw::*;
 use crate::*;
 
@@ -441,33 +442,107 @@ impl BlockingReader {
     }
 
     /// Create a new reader from an `oio::BlockingReader`.
-    #[cfg(test)]
     pub(crate) fn new(r: oio::BlockingReader) -> Self {
         BlockingReader { inner: r }
     }
-}
 
-impl oio::BlockingRead for BlockingReader {
+    /// Seek to the position of `pos` of reader.
     #[inline]
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        self.inner.read(buf)
+    pub fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
+        self.inner.seek(pos)
     }
 
+    /// Read at most `size` bytes of data from reader.
     #[inline]
-    fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
-        self.inner.seek(pos)
+    pub fn read(&mut self, limit: usize) -> Result<Bytes> {
+        self.inner.read(limit)
     }
 
-    #[inline]
-    fn next(&mut self) -> Option<Result<Bytes>> {
-        oio::BlockingRead::next(&mut self.inner)
+    /// Read exact `size` bytes of data from reader.
+    pub fn read_exact(&mut self, size: usize) -> Result<Bytes> {
+        // Lucky path.
+        let bs1 = self.inner.read(size)?;
+        debug_assert!(
+            bs1.len() <= size,
+            "read should not return more bytes than expected"
+        );
+        if bs1.len() == size {
+            return Ok(bs1);
+        }
+        if bs1.is_empty() {
+            return Err(
+                Error::new(ErrorKind::ContentIncomplete, "reader got too 
little data")
+                    .with_context("expect", size.to_string()),
+            );
+        }
+
+        let mut bs = BytesMut::with_capacity(size);
+        bs.put_slice(&bs1);
+
+        let mut remaining = size - bs.len();
+
+        loop {
+            let tmp = self.inner.read(remaining)?;
+            if tmp.is_empty() {
+                return Err(
+                    Error::new(ErrorKind::ContentIncomplete, "reader got too 
little data")
+                        .with_context("expect", size.to_string())
+                        .with_context("actual", bs.len().to_string()),
+                );
+            }
+            bs.put_slice(&tmp);
+            debug_assert!(
+                tmp.len() <= remaining,
+                "read should not return more bytes than expected"
+            );
+
+            remaining -= tmp.len();
+            if remaining == 0 {
+                break;
+            }
+        }
+
+        Ok(bs.freeze())
+    }
+    /// Reads all bytes until EOF in this source, placing them into buf.
+    pub fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
+        let start_len = buf.len();
+
+        loop {
+            if buf.len() == buf.capacity() {
+                buf.reserve(32); // buf is full, need more space
+            }
+
+            let spare = buf.spare_capacity_mut();
+            let mut read_buf: ReadBuf = ReadBuf::uninit(spare);
+
+            // SAFETY: These bytes were initialized but not filled in the 
previous loop
+            unsafe {
+                read_buf.assume_init(read_buf.capacity());
+            }
+
+            match self.read(read_buf.initialized_mut().len()) {
+                Ok(bs) if bs.is_empty() => return Ok(buf.len() - start_len),
+                Ok(bs) => {
+                    
read_buf.initialized_mut()[..bs.len()].copy_from_slice(&bs);
+
+                    // SAFETY: Read API makes sure that returning `n` is 
correct.
+                    unsafe {
+                        buf.set_len(buf.len() + bs.len());
+                    }
+                }
+                Err(e) => return Err(e),
+            }
+        }
     }
 }
 
 impl io::Read for BlockingReader {
     #[inline]
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        self.inner.read(buf).map_err(format_std_io_error)
+        let bs = self.inner.read(buf.len()).map_err(format_std_io_error)?;
+        buf[..bs.len()].copy_from_slice(&bs);
+        Ok(bs.len())
     }
 }
 
@@ -483,9 +558,15 @@ impl Iterator for BlockingReader {
 
     #[inline]
     fn next(&mut self) -> Option<Self::Item> {
-        self.inner
-            .next()
-            .map(|v| v.map_err(|err| 
io::Error::new(io::ErrorKind::Interrupted, err)))
+        match self
+            .inner
+            .read(4 * 1024 * 1024)
+            .map_err(format_std_io_error)
+        {
+            Ok(bs) if bs.is_empty() => None,
+            Ok(bs) => Some(Ok(bs)),
+            Err(err) => Some(Err(err)),
+        }
     }
 }
 

Reply via email to