This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 4a6d1f64d feat: Improve the read_to_end perf and add benchmark vs_fs
(#3617)
4a6d1f64d is described below
commit 4a6d1f64d4e545366008aac825384925e4d6d338
Author: Xuanwo <[email protected]>
AuthorDate: Sun Nov 19 00:03:49 2023 +0800
feat: Improve the read_to_end perf and add benchmark vs_fs (#3617)
Signed-off-by: Xuanwo <[email protected]>
---
Cargo.lock | 11 +++++
Cargo.toml | 1 +
core/Cargo.toml | 2 +
core/benches/oio/main.rs | 2 +-
core/benches/vs_fs/Cargo.toml | 36 ++++++++++++++
core/benches/vs_fs/README.md | 35 +++++++++++++
core/benches/vs_fs/src/main.rs | 74 ++++++++++++++++++++++++++++
core/src/raw/oio/read/api.rs | 64 ++++++++++++------------
core/src/types/operator/blocking_operator.rs | 18 +++++--
9 files changed, 208 insertions(+), 35 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 02bf97e81..662052bd2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4250,6 +4250,17 @@ dependencies = [
"wiremock",
]
+[[package]]
+name = "opendal-benchmark-vs-fs"
+version = "0.0.0"
+dependencies = [
+ "criterion",
+ "opendal",
+ "rand 0.8.5",
+ "tokio",
+ "uuid",
+]
+
[[package]]
name = "opendal-c"
version = "0.42.0"
diff --git a/Cargo.toml b/Cargo.toml
index 1fa2f169f..6ec1b15dd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,6 +22,7 @@ members = [
"core",
"core/fuzz",
"core/edge/*",
+ "core/benches/vs_fs",
"bindings/c",
"bindings/nodejs",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 7a77484e2..11d8cf96a 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -201,10 +201,12 @@ bench = false
[[bench]]
harness = false
name = "ops"
+required-features = ["tests"]
[[bench]]
harness = false
name = "oio"
+required-features = ["tests"]
[[test]]
harness = false
diff --git a/core/benches/oio/main.rs b/core/benches/oio/main.rs
index 85ca2ebbe..41ae52f5d 100644
--- a/core/benches/oio/main.rs
+++ b/core/benches/oio/main.rs
@@ -21,5 +21,5 @@ mod write;
use criterion::criterion_group;
use criterion::criterion_main;
-criterion_group!(benches, write::bench_exact_buf_write,);
+criterion_group!(benches, write::bench_exact_buf_write);
criterion_main!(benches);
diff --git a/core/benches/vs_fs/Cargo.toml b/core/benches/vs_fs/Cargo.toml
new file mode 100644
index 000000000..7a98ea846
--- /dev/null
+++ b/core/benches/vs_fs/Cargo.toml
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "opendal-benchmark-vs-fs"
+description = "OpenDAL Benchmark vs fs"
+version = "0.0.0"
+publish = false
+
+authors.workspace = true
+edition.workspace = true
+homepage.workspace = true
+license.workspace = true
+repository.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+opendal = { path = "../..", features = ["tests"] }
+tokio = { version = "1", features = ["full"] }
+uuid = { version = "1", features = ["v4"] }
+criterion = { version = "0.4", features = ["async", "async_tokio"] }
+rand = "0.8"
diff --git a/core/benches/vs_fs/README.md b/core/benches/vs_fs/README.md
new file mode 100644
index 000000000..6ded9ac01
--- /dev/null
+++ b/core/benches/vs_fs/README.md
@@ -0,0 +1,35 @@
+# OpenDAL Benchmark VS Fs
+
+This benchmark compares the performance of OpenDAL with the performance of the
`std::fs`.
+
+## Goal
+
+We expect OpenDAL to match `std::fs` in speed: the throughput of OpenDAL
should be within a `5%` range of `std::fs`.
+
+## Usage
+
+For test: `cargo run`
+
+```shell
+Testing vs_fs/std_fs_read
+Success
+Testing vs_fs/opendal_fs_read
+Success
+Testing vs_fs/opendal_fs_read_with_range
+Success
+```
+
+For bench: `cargo run --release -- --bench`
+
+```shell
+read/std_fs time: [749.57 µs 762.69 µs 777.07 µs]
+ thrpt: [20.108 GiB/s 20.487 GiB/s 20.845 GiB/s]
+
+read/opendal_fs time: [750.90 µs 755.39 µs 760.49 µs]
+ thrpt: [20.546 GiB/s 20.685 GiB/s 20.808 GiB/s]
+
+read/opendal_fs_with_range
+ time: [684.02 µs 690.77 µs 697.99 µs]
+ thrpt: [22.386 GiB/s 22.620 GiB/s 22.843 GiB/s]
+
+```
diff --git a/core/benches/vs_fs/src/main.rs b/core/benches/vs_fs/src/main.rs
new file mode 100644
index 000000000..7ec09fb02
--- /dev/null
+++ b/core/benches/vs_fs/src/main.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::Criterion;
+use opendal::services;
+use opendal::Operator;
+use rand::prelude::*;
+
+fn main() {
+ let mut c = Criterion::default().configure_from_args();
+ bench_vs_fs(&mut c);
+
+ c.final_summary();
+}
+
+fn bench_vs_fs(c: &mut Criterion) {
+ let mut cfg = services::Fs::default();
+ cfg.root("/tmp/opendal/");
+ let op = Operator::new(cfg).unwrap().finish().blocking();
+
+ let mut group = c.benchmark_group("read");
+ group.throughput(criterion::Throughput::Bytes(16 * 1024 * 1024));
+
+ group.bench_function("std_fs", |b| {
+ let path = format!("/tmp/opendal/{}", prepare());
+ b.iter(|| {
+ let _ = std::fs::read(&path).unwrap();
+ });
+ });
+ group.bench_function("opendal_fs", |b| {
+ let path = prepare();
+ b.iter(|| {
+ let _ = op.read(&path).unwrap();
+ });
+ });
+ group.bench_function("opendal_fs_with_range", |b| {
+ let path = prepare();
+ b.iter(|| {
+ let _ = op
+ .read_with(&path)
+ .range(0..16 * 1024 * 1024)
+ .call()
+ .unwrap();
+ });
+ });
+
+ group.finish()
+}
+
+fn prepare() -> String {
+ let mut rng = thread_rng();
+ let mut content = vec![0; 16 * 1024 * 1024];
+ rng.fill_bytes(&mut content);
+
+ let name = uuid::Uuid::new_v4();
+ let path = format!("/tmp/opendal/{}", name);
+ let _ = std::fs::write(path, content);
+
+ name.to_string()
+}
diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs
index 2c25f9981..03a362c6d 100644
--- a/core/src/raw/oio/read/api.rs
+++ b/core/src/raw/oio/read/api.rs
@@ -27,6 +27,7 @@ use std::task::Poll;
use bytes::Bytes;
use futures::Future;
use pin_project::pin_project;
+use tokio::io::ReadBuf;
use crate::*;
@@ -359,46 +360,47 @@ pub trait BlockingRead: Send + Sync {
/// Read all data of current reader to the end of buf.
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
- let start = buf.len();
- let mut next = MAX_READ_TO_END_GROW_SIZE;
- let mut length = start;
+ let start_len = buf.len();
+ let start_cap = buf.capacity();
loop {
- if buf.capacity() == length {
- buf.reserve(next);
- // # Safety
- //
- // We make sure that the length of buf is maintained correctly.
- #[allow(clippy::uninit_vec)]
- unsafe {
- buf.set_len(buf.capacity());
- }
+ if buf.len() == buf.capacity() {
+ buf.reserve(32); // buf is full, need more space
}
- let bs = &mut buf[length..];
- match self.read(bs) {
- Ok(0) => {
+ let spare = buf.spare_capacity_mut();
+ let mut read_buf: ReadBuf = ReadBuf::uninit(spare);
+
+ // SAFETY: These bytes were initialized but not filled in the
previous loop
+ unsafe {
+ read_buf.assume_init(read_buf.capacity());
+ }
+
+ match self.read(read_buf.initialize_unfilled()) {
+ Ok(0) => return Ok(buf.len() - start_len),
+ Ok(n) => {
+ // SAFETY: Read API makes sure that returning `n` is
correct.
unsafe {
- buf.set_len(length);
+ buf.set_len(buf.len() + n);
}
- return Ok(length - start);
}
- Ok(n) => {
- next = if n >= next {
- cmp::min(next.saturating_mul(2),
MAX_READ_TO_END_GROW_SIZE)
- } else if n >= next / 2 {
- next
- } else {
- cmp::max(next.saturating_div(2),
MIN_READ_TO_END_GROW_SIZE)
- };
+ Err(e) => return Err(e),
+ }
- // We can't allow bogus values from read. If it is too
large, the returned vec could have its length
- // set past its capacity, or if it overflows the vec could
be shortened which could create an invalid
- // string if this is called via read_to_string.
- assert!(n <= buf.len());
- length += n;
+ // The buffer might be an exact fit. Let's read into a probe buffer
+ // and see if it returns `Ok(0)`. If so, we've avoided an
+ // unnecessary doubling of the capacity. But if not, append the
+ // probe buffer to the primary buffer and let its capacity grow.
+ if buf.len() == buf.capacity() && buf.capacity() == start_cap {
+ let mut probe = [0u8; 32];
+
+ match self.read(&mut probe) {
+ Ok(0) => return Ok(buf.len() - start_len),
+ Ok(n) => {
+ buf.extend_from_slice(&probe[..n]);
+ }
+ Err(e) => return Err(e),
}
- Err(e) => return Err(e),
}
}
}
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index 2b1f4a456..5caed7534 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -18,8 +18,7 @@
use bytes::Bytes;
use super::operator_functions::*;
-use crate::raw::oio::BlockingRead;
-use crate::raw::oio::WriteBuf;
+use crate::raw::oio::{BlockingRead, WriteBuf};
use crate::raw::*;
use crate::*;
@@ -338,9 +337,22 @@ impl BlockingOperator {
);
}
+ let range = args.range();
+ let size_hint = match range.size() {
+ Some(v) => v,
+ None => {
+ let mut size = inner
+ .blocking_stat(&path, OpStat::default())?
+ .into_metadata()
+ .content_length();
+ size -= range.offset().unwrap_or(0);
+ size
+ }
+ };
+
let (_, mut s) = inner.blocking_read(&path, args)?;
- let mut buf = Vec::new();
+ let mut buf = Vec::with_capacity(size_hint as usize);
s.read_to_end(&mut buf)?;
Ok(buf)