This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a6d1f64d feat: Improve the read_to_end perf and add benchmark vs_fs 
(#3617)
4a6d1f64d is described below

commit 4a6d1f64d4e545366008aac825384925e4d6d338
Author: Xuanwo <[email protected]>
AuthorDate: Sun Nov 19 00:03:49 2023 +0800

    feat: Improve the read_to_end perf and add benchmark vs_fs (#3617)
    
    Signed-off-by: Xuanwo <[email protected]>
---
 Cargo.lock                                   | 11 +++++
 Cargo.toml                                   |  1 +
 core/Cargo.toml                              |  2 +
 core/benches/oio/main.rs                     |  2 +-
 core/benches/vs_fs/Cargo.toml                | 36 ++++++++++++++
 core/benches/vs_fs/README.md                 | 35 +++++++++++++
 core/benches/vs_fs/src/main.rs               | 74 ++++++++++++++++++++++++++++
 core/src/raw/oio/read/api.rs                 | 64 ++++++++++++------------
 core/src/types/operator/blocking_operator.rs | 18 +++++--
 9 files changed, 208 insertions(+), 35 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 02bf97e81..662052bd2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4250,6 +4250,17 @@ dependencies = [
  "wiremock",
 ]
 
+[[package]]
+name = "opendal-benchmark-vs-fs"
+version = "0.0.0"
+dependencies = [
+ "criterion",
+ "opendal",
+ "rand 0.8.5",
+ "tokio",
+ "uuid",
+]
+
 [[package]]
 name = "opendal-c"
 version = "0.42.0"
diff --git a/Cargo.toml b/Cargo.toml
index 1fa2f169f..6ec1b15dd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,6 +22,7 @@ members = [
   "core",
   "core/fuzz",
   "core/edge/*",
+  "core/benches/vs_fs",
 
   "bindings/c",
   "bindings/nodejs",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 7a77484e2..11d8cf96a 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -201,10 +201,12 @@ bench = false
 [[bench]]
 harness = false
 name = "ops"
+required-features = ["tests"]
 
 [[bench]]
 harness = false
 name = "oio"
+required-features = ["tests"]
 
 [[test]]
 harness = false
diff --git a/core/benches/oio/main.rs b/core/benches/oio/main.rs
index 85ca2ebbe..41ae52f5d 100644
--- a/core/benches/oio/main.rs
+++ b/core/benches/oio/main.rs
@@ -21,5 +21,5 @@ mod write;
 use criterion::criterion_group;
 use criterion::criterion_main;
 
-criterion_group!(benches, write::bench_exact_buf_write,);
+criterion_group!(benches, write::bench_exact_buf_write);
 criterion_main!(benches);
diff --git a/core/benches/vs_fs/Cargo.toml b/core/benches/vs_fs/Cargo.toml
new file mode 100644
index 000000000..7a98ea846
--- /dev/null
+++ b/core/benches/vs_fs/Cargo.toml
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "opendal-benchmark-vs-fs"
+description = "OpenDAL Benchmark vs fs"
+version = "0.0.0"
+publish = false
+
+authors.workspace = true
+edition.workspace = true
+homepage.workspace = true
+license.workspace = true
+repository.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+opendal = { path = "../..", features = ["tests"] }
+tokio = { version = "1", features = ["full"] }
+uuid = { version = "1", features = ["v4"] }
+criterion = { version = "0.4", features = ["async", "async_tokio"] }
+rand = "0.8"
diff --git a/core/benches/vs_fs/README.md b/core/benches/vs_fs/README.md
new file mode 100644
index 000000000..6ded9ac01
--- /dev/null
+++ b/core/benches/vs_fs/README.md
@@ -0,0 +1,35 @@
+# OpenDAL Benchmark VS Fs
+
+This benchmark compares the performance of OpenDAL with the performance of the 
`std::fs`.
+
+## Goal
+
+We expect OpenDAL to match `std::fs` in speed: the throughput of OpenDAL 
should be within a `5%` range of `std::fs`.
+
+## Usage
+
+For test: `cargo run`
+
+```shell
+Testing vs_fs/std_fs_read
+Success
+Testing vs_fs/opendal_fs_read
+Success
+Testing vs_fs/opendal_fs_read_with_range
+Success
+```
+
+For bench: `cargo run --release -- --bench`
+
+```shell
+read/std_fs       time:   [749.57 µs 762.69 µs 777.07 µs]
+                  thrpt:  [20.108 GiB/s 20.487 GiB/s 20.845 GiB/s]
+                        
+read/opendal_fs   time:   [750.90 µs 755.39 µs 760.49 µs]
+                  thrpt:  [20.546 GiB/s 20.685 GiB/s 20.808 GiB/s]
+                        
+read/opendal_fs_with_range
+                  time:   [684.02 µs 690.77 µs 697.99 µs]
+                  thrpt:  [22.386 GiB/s 22.620 GiB/s 22.843 GiB/s]
+
+```
diff --git a/core/benches/vs_fs/src/main.rs b/core/benches/vs_fs/src/main.rs
new file mode 100644
index 000000000..7ec09fb02
--- /dev/null
+++ b/core/benches/vs_fs/src/main.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::Criterion;
+use opendal::services;
+use opendal::Operator;
+use rand::prelude::*;
+
+fn main() {
+    let mut c = Criterion::default().configure_from_args();
+    bench_vs_fs(&mut c);
+
+    c.final_summary();
+}
+
+fn bench_vs_fs(c: &mut Criterion) {
+    let mut cfg = services::Fs::default();
+    cfg.root("/tmp/opendal/");
+    let op = Operator::new(cfg).unwrap().finish().blocking();
+
+    let mut group = c.benchmark_group("read");
+    group.throughput(criterion::Throughput::Bytes(16 * 1024 * 1024));
+
+    group.bench_function("std_fs", |b| {
+        let path = format!("/tmp/opendal/{}", prepare());
+        b.iter(|| {
+            let _ = std::fs::read(&path).unwrap();
+        });
+    });
+    group.bench_function("opendal_fs", |b| {
+        let path = prepare();
+        b.iter(|| {
+            let _ = op.read(&path).unwrap();
+        });
+    });
+    group.bench_function("opendal_fs_with_range", |b| {
+        let path = prepare();
+        b.iter(|| {
+            let _ = op
+                .read_with(&path)
+                .range(0..16 * 1024 * 1024)
+                .call()
+                .unwrap();
+        });
+    });
+
+    group.finish()
+}
+
+fn prepare() -> String {
+    let mut rng = thread_rng();
+    let mut content = vec![0; 16 * 1024 * 1024];
+    rng.fill_bytes(&mut content);
+
+    let name = uuid::Uuid::new_v4();
+    let path = format!("/tmp/opendal/{}", name);
+    let _ = std::fs::write(path, content);
+
+    name.to_string()
+}
diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs
index 2c25f9981..03a362c6d 100644
--- a/core/src/raw/oio/read/api.rs
+++ b/core/src/raw/oio/read/api.rs
@@ -27,6 +27,7 @@ use std::task::Poll;
 use bytes::Bytes;
 use futures::Future;
 use pin_project::pin_project;
+use tokio::io::ReadBuf;
 
 use crate::*;
 
@@ -359,46 +360,47 @@ pub trait BlockingRead: Send + Sync {
 
     /// Read all data of current reader to the end of buf.
     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
-        let start = buf.len();
-        let mut next = MAX_READ_TO_END_GROW_SIZE;
-        let mut length = start;
+        let start_len = buf.len();
+        let start_cap = buf.capacity();
 
         loop {
-            if buf.capacity() == length {
-                buf.reserve(next);
-                // # Safety
-                //
-                // We make sure that the length of buf is maintained correctly.
-                #[allow(clippy::uninit_vec)]
-                unsafe {
-                    buf.set_len(buf.capacity());
-                }
+            if buf.len() == buf.capacity() {
+                buf.reserve(32); // buf is full, need more space
             }
 
-            let bs = &mut buf[length..];
-            match self.read(bs) {
-                Ok(0) => {
+            let spare = buf.spare_capacity_mut();
+            let mut read_buf: ReadBuf = ReadBuf::uninit(spare);
+
+            // SAFETY: These bytes were initialized but not filled in the 
previous loop
+            unsafe {
+                read_buf.assume_init(read_buf.capacity());
+            }
+
+            match self.read(read_buf.initialize_unfilled()) {
+                Ok(0) => return Ok(buf.len() - start_len),
+                Ok(n) => {
+                    // SAFETY: Read API makes sure that returning `n` is 
correct.
                     unsafe {
-                        buf.set_len(length);
+                        buf.set_len(buf.len() + n);
                     }
-                    return Ok(length - start);
                 }
-                Ok(n) => {
-                    next = if n >= next {
-                        cmp::min(next.saturating_mul(2), 
MAX_READ_TO_END_GROW_SIZE)
-                    } else if n >= next / 2 {
-                        next
-                    } else {
-                        cmp::max(next.saturating_div(2), 
MIN_READ_TO_END_GROW_SIZE)
-                    };
+                Err(e) => return Err(e),
+            }
 
-                    // We can't allow bogus values from read. If it is too 
large, the returned vec could have its length
-                    // set past its capacity, or if it overflows the vec could 
be shortened which could create an invalid
-                    // string if this is called via read_to_string.
-                    assert!(n <= buf.len());
-                    length += n;
+            // The buffer might be an exact fit. Let's read into a probe buffer
+            // and see if it returns `Ok(0)`. If so, we've avoided an
+            // unnecessary doubling of the capacity. But if not, append the
+            // probe buffer to the primary buffer and let its capacity grow.
+            if buf.len() == buf.capacity() && buf.capacity() == start_cap {
+                let mut probe = [0u8; 32];
+
+                match self.read(&mut probe) {
+                    Ok(0) => return Ok(buf.len() - start_len),
+                    Ok(n) => {
+                        buf.extend_from_slice(&probe[..n]);
+                    }
+                    Err(e) => return Err(e),
                 }
-                Err(e) => return Err(e),
             }
         }
     }
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index 2b1f4a456..5caed7534 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -18,8 +18,7 @@
 use bytes::Bytes;
 
 use super::operator_functions::*;
-use crate::raw::oio::BlockingRead;
-use crate::raw::oio::WriteBuf;
+use crate::raw::oio::{BlockingRead, WriteBuf};
 use crate::raw::*;
 use crate::*;
 
@@ -338,9 +337,22 @@ impl BlockingOperator {
                     );
                 }
 
+                let range = args.range();
+                let size_hint = match range.size() {
+                    Some(v) => v,
+                    None => {
+                        let mut size = inner
+                            .blocking_stat(&path, OpStat::default())?
+                            .into_metadata()
+                            .content_length();
+                        size -= range.offset().unwrap_or(0);
+                        size
+                    }
+                };
+
                 let (_, mut s) = inner.blocking_read(&path, args)?;
 
-                let mut buf = Vec::new();
+                let mut buf = Vec::with_capacity(size_hint as usize);
                 s.read_to_end(&mut buf)?;
 
                 Ok(buf)

Reply via email to