This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new c59d6f954 perf(rust): use segmented pool to reduce contention of fory
pool (#2945)
c59d6f954 is described below
commit c59d6f954afab786bdedf0fc392445594fc34fd1
Author: Shawn Yang <[email protected]>
AuthorDate: Fri Nov 28 20:01:00 2025 +0800
perf(rust): use segmented pool to reduce contention of fory pool (#2945)
## Why?
<!-- Describe the purpose of this PR. -->
## What does this PR do?
use segmented pool to reduce contention of fory pool
## Related issues
#2924
## Does this PR introduce any user-facing change?
<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fory/issues/new/choose) describing the
need to do so and update the document if necessary.
Delete section if not applicable.
-->
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
Delete section if not applicable.
-->
---
benchmarks/rust_benchmark/Cargo.toml | 5 +
.../rust_benchmark/benches/threaded_bench.rs | 111 +++++++++++++++++++++
rust/fory-core/src/fory.rs | 4 +-
rust/fory-core/src/resolver/context.rs | 37 -------
rust/fory-core/src/resolver/mod.rs | 1 +
rust/fory-core/src/resolver/pool.rs | 91 +++++++++++++++++
6 files changed, 210 insertions(+), 39 deletions(-)
diff --git a/benchmarks/rust_benchmark/Cargo.toml
b/benchmarks/rust_benchmark/Cargo.toml
index 675085ad6..3c66f933a 100644
--- a/benchmarks/rust_benchmark/Cargo.toml
+++ b/benchmarks/rust_benchmark/Cargo.toml
@@ -39,6 +39,11 @@ name = "buffer_read_bench"
path = "benches/buffer_read_bench.rs"
harness = false
+[[bench]]
+name = "threaded_bench"
+path = "benches/threaded_bench.rs"
+harness = false
+
[dependencies]
fory = { path = "../../rust/fory" }
fory-core = { path = "../../rust/fory-core" }
diff --git a/benchmarks/rust_benchmark/benches/threaded_bench.rs
b/benchmarks/rust_benchmark/benches/threaded_bench.rs
new file mode 100644
index 000000000..404d7720a
--- /dev/null
+++ b/benchmarks/rust_benchmark/benches/threaded_bench.rs
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::{criterion_group, criterion_main, Criterion};
+use fory::Fory;
+use fory_derive::ForyObject;
+use std::hint::black_box;
+use std::sync::Arc;
+use std::thread;
+
+#[cfg(feature = "profiling")]
+use pprof::criterion::{Output, PProfProfiler};
+
+#[derive(Debug, ForyObject)]
+pub struct UserSessionMetrics {
+ pub request_count: u64,
+ pub unique_ip_count: u64,
+ pub unique_user_agent_count: u64,
+ pub unique_url_count: u64,
+ pub unique_resource_count: u64,
+ pub active_duration_secs: u64,
+ pub first_seen_time: u64,
+ pub last_seen_time: u64,
+ pub updated_at: u64,
+}
+
+fn create_fory() -> Fory {
+ let mut fory = Fory::default();
+ fory.register::<UserSessionMetrics>(2)
+ .expect("register UserSessionMetrics");
+ fory
+}
+
+fn sample_metrics() -> UserSessionMetrics {
+ UserSessionMetrics {
+ request_count: 256,
+ unique_ip_count: 32,
+ unique_user_agent_count: 12,
+ unique_url_count: 64,
+ unique_resource_count: 48,
+ active_duration_secs: 90,
+ first_seen_time: 1_699_999_900_000,
+ last_seen_time: 1_700_000_000_000,
+ updated_at: 1_700_000_000_000,
+ }
+}
+
+fn bench_threaded_serialization(c: &mut Criterion) {
+ const ITERATIONS: usize = 200_000;
+ const THREAD_COUNT: usize = 8;
+
+ let mut group = c.benchmark_group("ThreadedSerialization");
+
+ // Single-thread baseline
+ let fory = create_fory();
+ let value = sample_metrics();
+ group.bench_function("single_thread", |b| {
+ b.iter(|| {
+ for _ in 0..ITERATIONS {
+ black_box(fory.serialize(black_box(&value)).unwrap());
+ }
+ });
+ });
+
+ // Multi-threaded with shared Fory instance
+ let shared_fory = Arc::new(create_fory());
+ let shared_value = Arc::new(sample_metrics());
+ group.bench_function("8_threads_shared_fory", |b| {
+ b.iter(|| {
+ thread::scope(|s| {
+ for _ in 0..THREAD_COUNT {
+ let fory = Arc::clone(&shared_fory);
+ let value = Arc::clone(&shared_value);
+ s.spawn(move || {
+ for _ in 0..(ITERATIONS / THREAD_COUNT) {
+ black_box(fory.serialize(value.as_ref()).unwrap());
+ }
+ });
+ }
+ });
+ });
+ });
+
+ group.finish();
+}
+
+#[cfg(feature = "profiling")]
+criterion_group! {
+ name = benches;
+ config = Criterion::default().with_profiler(PProfProfiler::new(100,
Output::Flamegraph(None)));
+ targets = bench_threaded_serialization
+}
+
+#[cfg(not(feature = "profiling"))]
+criterion_group!(benches, bench_threaded_serialization);
+
+criterion_main!(benches);
diff --git a/rust/fory-core/src/fory.rs b/rust/fory-core/src/fory.rs
index d9a046bb3..f3ecf8e70 100644
--- a/rust/fory-core/src/fory.rs
+++ b/rust/fory-core/src/fory.rs
@@ -18,8 +18,8 @@
use crate::buffer::{Reader, Writer};
use crate::ensure;
use crate::error::Error;
-use crate::resolver::context::WriteContext;
-use crate::resolver::context::{Pool, ReadContext};
+use crate::resolver::context::{ReadContext, WriteContext};
+use crate::resolver::pool::Pool;
use crate::resolver::type_resolver::TypeResolver;
use crate::serializer::ForyDefault;
use crate::serializer::{Serializer, StructSerializer};
diff --git a/rust/fory-core/src/resolver/context.rs
b/rust/fory-core/src/resolver/context.rs
index c116d6666..7961fc2fe 100644
--- a/rust/fory-core/src/resolver/context.rs
+++ b/rust/fory-core/src/resolver/context.rs
@@ -25,7 +25,6 @@ use
crate::resolver::meta_string_resolver::{MetaStringReaderResolver, MetaString
use crate::resolver::ref_resolver::{RefReader, RefWriter};
use crate::resolver::type_resolver::{TypeInfo, TypeResolver};
use crate::types;
-use crate::util::Spinlock;
use std::rc::Rc;
/// Serialization state container used on a single thread at a time.
@@ -417,39 +416,3 @@ impl<'a> ReadContext<'a> {
self.ref_reader.reset();
}
}
-
-pub struct Pool<T> {
- items: Spinlock<Vec<T>>,
- factory: Box<dyn Fn() -> T + Send + Sync>,
-}
-
-impl<T> Pool<T> {
- pub fn new<F>(factory: F) -> Self
- where
- F: Fn() -> T + Send + Sync + 'static,
- {
- Pool {
- items: Spinlock::new(vec![]),
- factory: Box::new(factory),
- }
- }
-
- #[inline(always)]
- pub fn borrow_mut<Result>(&self, handler: impl FnOnce(&mut T) -> Result)
-> Result {
- let mut obj = self.get();
- let result = handler(&mut obj);
- self.put(obj);
- result
- }
-
- #[inline(always)]
- fn get(&self) -> T {
- self.items.lock().pop().unwrap_or_else(|| (self.factory)())
- }
-
- // put back manually
- #[inline(always)]
- fn put(&self, item: T) {
- self.items.lock().push(item);
- }
-}
diff --git a/rust/fory-core/src/resolver/mod.rs
b/rust/fory-core/src/resolver/mod.rs
index 35a3538fa..ad58e725e 100644
--- a/rust/fory-core/src/resolver/mod.rs
+++ b/rust/fory-core/src/resolver/mod.rs
@@ -18,5 +18,6 @@
pub mod context;
pub mod meta_resolver;
pub mod meta_string_resolver;
+pub mod pool;
pub mod ref_resolver;
pub mod type_resolver;
diff --git a/rust/fory-core/src/resolver/pool.rs
b/rust/fory-core/src/resolver/pool.rs
new file mode 100644
index 000000000..b1284a288
--- /dev/null
+++ b/rust/fory-core/src/resolver/pool.rs
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::util::Spinlock;
+use std::cell::Cell;
+use std::sync::atomic::{AtomicU64, Ordering};
+
+/// Number of segments in the pool. Using 16 segments to reduce contention.
+const NUM_SEGMENTS: usize = 16;
+
+/// Global counter to assign unique IDs to threads for segment selection.
+static THREAD_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
+
+thread_local! {
+ /// Cached segment index for the current thread.
+ /// Using a simple incrementing counter ensures even distribution across
segments.
+ static SEGMENT_INDEX: Cell<usize> = Cell::new(
+ (THREAD_ID_COUNTER.fetch_add(1, Ordering::Relaxed) as usize) %
NUM_SEGMENTS
+ );
+}
+
+/// A segment containing a spinlock-protected vector of pooled items.
+struct Segment<T> {
+ items: Spinlock<Vec<T>>,
+}
+
+impl<T> Segment<T> {
+ fn new() -> Self {
+ Segment {
+ items: Spinlock::new(Vec::new()),
+ }
+ }
+
+ #[inline(always)]
+ fn get(&self, factory: &dyn Fn() -> T) -> T {
+ self.items.lock().pop().unwrap_or_else(factory)
+ }
+
+ #[inline(always)]
+ fn put(&self, item: T) {
+ self.items.lock().push(item);
+ }
+}
+
+/// A segmented object pool that reduces lock contention by distributing
+/// access across multiple segments based on thread ID.
+///
+/// Each thread is assigned to a specific segment, so threads accessing
+/// the pool concurrently will typically hit different locks, reducing
contention.
+pub struct Pool<T> {
+ segments: [Segment<T>; NUM_SEGMENTS],
+ factory: Box<dyn Fn() -> T + Send + Sync>,
+}
+
+impl<T> Pool<T> {
+ pub fn new<F>(factory: F) -> Self
+ where
+ F: Fn() -> T + Send + Sync + 'static,
+ {
+ Pool {
+ segments: std::array::from_fn(|_| Segment::new()),
+ factory: Box::new(factory),
+ }
+ }
+
+ /// Borrows an item from the pool, executes the handler, and returns the
item to the pool.
+ #[inline(always)]
+ pub fn borrow_mut<Result>(&self, handler: impl FnOnce(&mut T) -> Result)
-> Result {
+ let segment_idx = SEGMENT_INDEX.with(|idx| idx.get());
+ let segment = &self.segments[segment_idx];
+
+ let mut obj = segment.get(&*self.factory);
+ let result = handler(&mut obj);
+ segment.put(obj);
+ result
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]