This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new eae25b6af fix(rust): deep clone TypeMeta to prevent UB in concurrent
scenarios (#3511)
eae25b6af is described below
commit eae25b6aff65c48f64d16d24faa51f2abc70bce4
Author: Peiyang He <[email protected]>
AuthorDate: Wed Mar 25 02:39:11 2026 -0400
fix(rust): deep clone TypeMeta to prevent UB in concurrent scenarios (#3511)
## Why?
As mentioned in
https://github.com/apache/fory/pull/3490#issuecomment-4079516008, one
cargo bench case stably fails in the current code. After some digging, I
think it results from this line:
https://github.com/apache/fory/blob/ede9b64750b2f03a238928bea3d081f2b7744b89/rust/fory-core/src/resolver/type_resolver.rs#L1389
The `clone()` method for `TypeResolver` should return a **deep** clone:
https://github.com/apache/fory/blob/ede9b64750b2f03a238928bea3d081f2b7744b89/rust/fory-core/src/resolver/type_resolver.rs#L1316-L1319
But the `type_meta_by_index` field has type `Vec<Option<Rc<TypeMeta>>>`,
calling `clone()` on this type directly would lead to a **shadow** clone
indeed.
The `internal_type_info_by_id` field in `TypeResolver` also has a
similar type `Vec<Option<Rc<TypeInfo>>>` but it doesn't call `clone()`
directly.
## What does this PR do?
- deep clone `type_meta_by_index`. Now the cargo bench case can run
successfully and stably.
- introduce bazel-setup GitHub action in C++ sanitizer job
- the existing test cases in `test_multi_thread.rs` are a little too
weak to expose tricky concurrent issues. I wrote a new test case copying
the main logic of the failed cargo bench case. Without the fix, this new
test case would fail with a high chance.
## Related issues
No.
## AI Contribution Checklist
No.
## Does this PR introduce any user-facing change?
No.
## Benchmark
No.
---
.github/workflows/ci.yml | 22 +++-------
rust/fory-core/src/resolver/type_resolver.rs | 9 +++-
rust/tests/tests/test_multi_thread.rs | 64 +++++++++++++++++++++++++++-
3 files changed, 76 insertions(+), 19 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 96cfd296c..f381fc8b2 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -616,15 +616,11 @@ jobs:
with:
python-version: 3.11
cache: 'pip'
- - name: Cache Bazel binary
- uses: actions/cache@v4
+ - name: Set up Bazel
+ uses:
bazel-contrib/setup-bazel@083175551ceeceebc757ebee2127fde78840ca77 # 0.18.0
with:
- path: |
- ~/bin/bazel
- ~/.local/bin/bazel
- key: bazel-binary-${{ runner.os }}-${{ runner.arch }}-${{
hashFiles('.bazelversion') }}
- restore-keys: |
- bazel-binary-${{ runner.os }}-${{ runner.arch }}-
+ bazelisk-cache: true
+ bazelisk-version: '1.x'
- name: Cache Bazel repository cache
uses: actions/cache@v4
with:
@@ -644,18 +640,10 @@ jobs:
run: |
ARCH="$(uname -m)"
BAZEL_CONFIGS="--config=${{ matrix.sanitizer }}"
- if [[ -x ~/bin/bazel ]]; then
- BAZEL_BIN=~/bin/bazel
- elif [[ -x ~/.local/bin/bazel ]]; then
- BAZEL_BIN=~/.local/bin/bazel
- else
- echo "bazel not found in ~/bin or ~/.local/bin"
- exit 1
- fi
if [[ "${ARCH}" == "x86_64" || "${ARCH}" == "amd64" ]]; then
BAZEL_CONFIGS="--config=x86_64 ${BAZEL_CONFIGS}"
fi
- ${BAZEL_BIN} test --cache_test_results=no ${BAZEL_CONFIGS}
$(${BAZEL_BIN} query //cpp/...)
+ bazel test --cache_test_results=no ${BAZEL_CONFIGS} $(bazel query
//cpp/...)
- name: Upload Bazel Test Logs (${{ matrix.sanitizer }})
uses: actions/upload-artifact@v4
if: ${{ !cancelled() }}
diff --git a/rust/fory-core/src/resolver/type_resolver.rs
b/rust/fory-core/src/resolver/type_resolver.rs
index 4c67f483a..fe9d81306 100644
--- a/rust/fory-core/src/resolver/type_resolver.rs
+++ b/rust/fory-core/src/resolver/type_resolver.rs
@@ -1376,6 +1376,13 @@ impl TypeResolver {
})
.collect();
+ // Deep clone the TypeMeta as well
+ let type_meta_by_index: Vec<Option<Rc<TypeMeta>>> = self
+ .type_meta_by_index
+ .iter()
+ .map(|opt| opt.as_ref().map(|meta| Rc::new(meta.deep_clone())))
+ .collect();
+
TypeResolver {
internal_type_info_by_id,
user_type_info_by_id,
@@ -1386,7 +1393,7 @@ impl TypeResolver {
type_id_index: self.type_id_index.clone(),
user_type_id_index: self.user_type_id_index.clone(),
rust_type_id_by_index: self.rust_type_id_by_index.clone(),
- type_meta_by_index: self.type_meta_by_index.clone(),
+ type_meta_by_index,
compatible: self.compatible,
xlang: self.xlang,
}
diff --git a/rust/tests/tests/test_multi_thread.rs
b/rust/tests/tests/test_multi_thread.rs
index a8a7678c5..35610e393 100644
--- a/rust/tests/tests/test_multi_thread.rs
+++ b/rust/tests/tests/test_multi_thread.rs
@@ -18,7 +18,7 @@
use fory_core::Fory;
use fory_derive::ForyObject;
use std::collections::HashSet;
-use std::sync::Arc;
+use std::sync::{Arc, Barrier};
use std::thread;
#[test]
@@ -100,3 +100,65 @@ fn test_struct_multi_thread() {
// verify
assert_eq!(dest, src);
}
+
+#[test]
+fn test_multiple_threads_shared_fory() {
+ const THREAD_COUNT: usize = 8;
+ const ROUNDS: usize = 200;
+ const ITERATIONS_PER_THREAD: usize = 256;
+
+ #[derive(Debug, ForyObject)]
+ struct UserSessionMetrics {
+ #[fory(id = 0)]
+ request_count: u64,
+ #[fory(id = 1)]
+ unique_ip_count: u64,
+ #[fory(id = 2)]
+ unique_user_agent_count: u64,
+ #[fory(id = 3)]
+ unique_url_count: u64,
+ #[fory(id = 4)]
+ unique_resource_count: u64,
+ #[fory(id = 5)]
+ active_duration_secs: u64,
+ #[fory(id = 6)]
+ first_seen_time: u64,
+ #[fory(id = 7)]
+ last_seen_time: u64,
+ #[fory(id = 8)]
+ updated_at: u64,
+ }
+
+ let mut fory = Fory::default();
+ fory.register::<UserSessionMetrics>(2)
+ .expect("register UserSessionMetrics");
+ let shared_fory = Arc::new(fory);
+ let shared_value = Arc::new(UserSessionMetrics {
+ request_count: 256,
+ unique_ip_count: 32,
+ unique_user_agent_count: 12,
+ unique_url_count: 64,
+ unique_resource_count: 48,
+ active_duration_secs: 90,
+ first_seen_time: 1_699_999_900_000,
+ last_seen_time: 1_700_000_000_000,
+ updated_at: 1_700_000_000_000,
+ });
+
+ for _ in 0..ROUNDS {
+ thread::scope(|s| {
+ let start_barrier = Arc::new(Barrier::new(THREAD_COUNT));
+ for _ in 0..THREAD_COUNT {
+ let fory = Arc::clone(&shared_fory);
+ let value = Arc::clone(&shared_value);
+ let start_barrier = Arc::clone(&start_barrier);
+ s.spawn(move || {
+ start_barrier.wait();
+ for _ in 0..ITERATIONS_PER_THREAD {
+ let _ = fory.serialize(value.as_ref()).unwrap();
+ }
+ });
+ }
+ });
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]