This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new f883c5094 feat: introduce foyer layer (#6366)
f883c5094 is described below
commit f883c509481526d0d21044771ea74b622f00cef8
Author: Croxx <[email protected]>
AuthorDate: Mon Jan 12 22:00:14 2026 +0800
feat: introduce foyer layer (#6366)
* feat: introduce foyer layer, partially impl it
Signed-off-by: MrCroxx <[email protected]>
* test: add FoyerLayer unit tests
Signed-off-by: MrCroxx <[email protected]>
* fix: bump foyer and fix bugs in deletion
Signed-off-by: MrCroxx <[email protected]>
* chore: bump foyer version included in the main branch
Signed-off-by: MrCroxx <[email protected]>
* doc: fix typos
Signed-off-by: MrCroxx <[email protected]>
* fix: bump foyer version, expose error from foyer
Signed-off-by: MrCroxx <[email protected]>
* chore: make clippy happer
Signed-off-by: MrCroxx <[email protected]>
* refactor: resolve comments
Signed-off-by: MrCroxx <[email protected]>
* feat: introduce size_limit to opt-out large objects
Signed-off-by: MrCroxx <[email protected]>
* refactor: add version to foyer cache key
Signed-off-by: MrCroxx <[email protected]>
* fix: fmt
* fix: cargo check --all-features
* fix: cargo clippy --all-features
* refactor: utilize bincode for FoyerKey
* fix: rm the mis-committed mod.rs
* Update core/layers/foyer/src/lib.rs
Co-authored-by: Jorge Hermo <[email protected]>
* Update core/layers/foyer/src/lib.rs
Co-authored-by: Jorge Hermo <[email protected]>
* not to cache it if the object is bigger than size_limit
* add test case for size_limit check
* fix: if the writes is too big, avoid appending to buf
---------
Signed-off-by: MrCroxx <[email protected]>
Co-authored-by: Asuka Minato <[email protected]>
Co-authored-by: Li Yazhou <[email protected]>
Co-authored-by: Jorge Hermo <[email protected]>
---
core/Cargo.lock | 425 +++++++++++++-
core/Cargo.toml | 2 +
...er_integration.md => 6370_foyer_integration.md} | 0
core/layers/foyer/Cargo.toml | 45 ++
core/layers/foyer/src/lib.rs | 630 +++++++++++++++++++++
core/src/lib.rs | 2 +
6 files changed, 1094 insertions(+), 10 deletions(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 27894c616..2c2fd9bba 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -566,6 +566,18 @@ dependencies = [
"syn 2.0.111",
]
+[[package]]
+name = "auto_enums"
+version = "0.8.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c170965892137a3a9aeb000b4524aa3cc022a310e709d848b6e1cdce4ab4781"
+dependencies = [
+ "derive_utils",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.111",
+]
+
[[package]]
name = "autocfg"
version = "1.5.0"
@@ -1712,7 +1724,7 @@ dependencies = [
"anstream",
"anstyle",
"clap_lex",
- "strsim",
+ "strsim 0.11.1",
"terminal_size",
]
@@ -1743,6 +1755,12 @@ dependencies = [
"cc",
]
+[[package]]
+name = "cmsketch"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d7ee2cfacbd29706479902b06d75ad8f1362900836aa32799eabc7e004bfd854"
+
[[package]]
name = "coarsetime"
version = "0.1.36"
@@ -2260,6 +2278,16 @@ dependencies = [
"cipher",
]
+[[package]]
+name = "darling"
+version = "0.14.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850"
+dependencies = [
+ "darling_core 0.14.4",
+ "darling_macro 0.14.4",
+]
+
[[package]]
name = "darling"
version = "0.20.11"
@@ -2280,6 +2308,20 @@ dependencies = [
"darling_macro 0.21.3",
]
+[[package]]
+name = "darling_core"
+version = "0.14.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0"
+dependencies = [
+ "fnv",
+ "ident_case",
+ "proc-macro2",
+ "quote",
+ "strsim 0.10.0",
+ "syn 1.0.109",
+]
+
[[package]]
name = "darling_core"
version = "0.20.11"
@@ -2290,7 +2332,7 @@ dependencies = [
"ident_case",
"proc-macro2",
"quote",
- "strsim",
+ "strsim 0.11.1",
"syn 2.0.111",
]
@@ -2304,10 +2346,21 @@ dependencies = [
"ident_case",
"proc-macro2",
"quote",
- "strsim",
+ "strsim 0.11.1",
"syn 2.0.111",
]
+[[package]]
+name = "darling_macro"
+version = "0.14.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e"
+dependencies = [
+ "darling_core 0.14.4",
+ "quote",
+ "syn 1.0.109",
+]
+
[[package]]
name = "darling_macro"
version = "0.20.11"
@@ -2503,6 +2556,17 @@ dependencies = [
"unicode-xid",
]
+[[package]]
+name = "derive_utils"
+version = "0.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ccfae181bab5ab6c5478b2ccb69e4c68a02f8c3ec72f6616bfec9dbc599d2ee0"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.111",
+]
+
[[package]]
name = "des"
version = "0.8.1"
@@ -2630,6 +2694,12 @@ version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
+[[package]]
+name = "downcast-rs"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2"
+
[[package]]
name = "dtoa"
version = "1.0.10"
@@ -3157,6 +3227,112 @@ dependencies = [
"uuid",
]
+[[package]]
+name = "foyer"
+version = "0.18.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "642093b1a72c4a0ef89862484d669a353e732974781bb9c49a979526d1e30edc"
+dependencies = [
+ "equivalent",
+ "foyer-common",
+ "foyer-memory",
+ "foyer-storage",
+ "madsim-tokio",
+ "mixtrics",
+ "pin-project",
+ "serde",
+ "thiserror 2.0.17",
+ "tokio",
+ "tracing",
+]
+
+[[package]]
+name = "foyer-common"
+version = "0.18.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9db9c0e4648b13e9216d785b308d43751ca975301aeb83e607ec630b6f956944"
+dependencies = [
+ "bincode",
+ "bytes",
+ "cfg-if",
+ "itertools 0.14.0",
+ "madsim-tokio",
+ "mixtrics",
+ "parking_lot 0.12.5",
+ "pin-project",
+ "serde",
+ "thiserror 2.0.17",
+ "tokio",
+ "twox-hash",
+]
+
+[[package]]
+name = "foyer-intrusive-collections"
+version = "0.10.0-dev"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e4fee46bea69e0596130e3210e65d3424e0ac1e6df3bde6636304bdf1ca4a3b"
+dependencies = [
+ "memoffset 0.9.1",
+]
+
+[[package]]
+name = "foyer-memory"
+version = "0.18.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "040dc38acbfca8f1def26bbbd9e9199090884aabb15de99f7bf4060be66ff608"
+dependencies = [
+ "arc-swap",
+ "bitflags 2.10.0",
+ "cmsketch",
+ "equivalent",
+ "foyer-common",
+ "foyer-intrusive-collections",
+ "hashbrown 0.15.5",
+ "itertools 0.14.0",
+ "madsim-tokio",
+ "mixtrics",
+ "parking_lot 0.12.5",
+ "pin-project",
+ "serde",
+ "thiserror 2.0.17",
+ "tokio",
+ "tracing",
+]
+
+[[package]]
+name = "foyer-storage"
+version = "0.18.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54a77ed888da490e997da6d6d62fcbce3f202ccf28be098c4ea595ca046fc4a9"
+dependencies = [
+ "allocator-api2",
+ "anyhow",
+ "auto_enums",
+ "bytes",
+ "equivalent",
+ "flume 0.11.1",
+ "foyer-common",
+ "foyer-memory",
+ "fs4",
+ "futures-core",
+ "futures-util",
+ "itertools 0.14.0",
+ "libc",
+ "lz4",
+ "madsim-tokio",
+ "ordered_hash_map",
+ "parking_lot 0.12.5",
+ "paste",
+ "pin-project",
+ "rand 0.9.2",
+ "serde",
+ "thiserror 2.0.17",
+ "tokio",
+ "tracing",
+ "twox-hash",
+ "zstd",
+]
+
[[package]]
name = "fs2"
version = "0.4.3"
@@ -3167,6 +3343,16 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "fs4"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8640e34b88f7652208ce9e88b1a37a2ae95227d84abec377ccd3c5cfeb141ed4"
+dependencies = [
+ "rustix 1.1.2",
+ "windows-sys 0.59.0",
+]
+
[[package]]
name = "fs_extra"
version = "1.3.0"
@@ -3639,6 +3825,15 @@ dependencies = [
"ahash 0.7.8",
]
+[[package]]
+name = "hashbrown"
+version = "0.13.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
+dependencies = [
+ "ahash 0.8.12",
+]
+
[[package]]
name = "hashbrown"
version = "0.14.5"
@@ -4854,6 +5049,25 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
+[[package]]
+name = "lz4"
+version = "1.28.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4"
+dependencies = [
+ "lz4-sys",
+]
+
+[[package]]
+name = "lz4-sys"
+version = "1.11.1+lz4-1.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6bd8c0d6c6ed0cd30b3652886bb8711dc4bb01d637a68105a3d5158039b418e6"
+dependencies = [
+ "cc",
+ "libc",
+]
+
[[package]]
name = "mac"
version = "0.1.1"
@@ -4908,6 +5122,61 @@ dependencies = [
"syn 2.0.111",
]
+[[package]]
+name = "madsim"
+version = "0.2.34"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "18351aac4194337d6ea9ffbd25b3d1540ecc0754142af1bff5ba7392d1f6f771"
+dependencies = [
+ "ahash 0.8.12",
+ "async-channel 2.5.0",
+ "async-stream",
+ "async-task",
+ "bincode",
+ "bytes",
+ "downcast-rs",
+ "errno",
+ "futures-util",
+ "lazy_static",
+ "libc",
+ "madsim-macros",
+ "naive-timer",
+ "panic-message",
+ "rand 0.8.5",
+ "rand_xoshiro",
+ "rustversion",
+ "serde",
+ "spin",
+ "tokio",
+ "tokio-util",
+ "toml",
+ "tracing",
+ "tracing-subscriber",
+]
+
+[[package]]
+name = "madsim-macros"
+version = "0.2.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1"
+dependencies = [
+ "darling 0.14.4",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
+name = "madsim-tokio"
+version = "0.2.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7d3eb2acc57c82d21d699119b859e2df70a91dbdb84734885a1e72be83bdecb5"
+dependencies = [
+ "madsim",
+ "spin",
+ "tokio",
+]
+
[[package]]
name = "maplit"
version = "1.0.2"
@@ -5019,6 +5288,15 @@ dependencies = [
"autocfg",
]
+[[package]]
+name = "memoffset"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a"
+dependencies = [
+ "autocfg",
+]
+
[[package]]
name = "metrics"
version = "0.24.3"
@@ -5122,6 +5400,16 @@ dependencies = [
"windows-sys 0.61.2",
]
+[[package]]
+name = "mixtrics"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fb252c728b9d77c6ef9103f0c81524fa0a3d3b161d0a936295d7fbeff6e04c11"
+dependencies = [
+ "itertools 0.14.0",
+ "parking_lot 0.12.5",
+]
+
[[package]]
name = "moka"
version = "0.12.12"
@@ -5195,7 +5483,7 @@ dependencies = [
"sha2",
"socket2 0.6.1",
"stringprep",
- "strsim",
+ "strsim 0.11.1",
"take_mut",
"thiserror 2.0.17",
"tokio",
@@ -5275,6 +5563,12 @@ version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
+[[package]]
+name = "naive-timer"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "034a0ad7deebf0c2abcf2435950a6666c3c15ea9d8fad0c0f48efa8a7f843fed"
+
[[package]]
name = "nanoid"
version = "0.4.0"
@@ -5363,7 +5657,7 @@ dependencies = [
"bitflags 1.3.2",
"cfg-if",
"libc",
- "memoffset",
+ "memoffset 0.7.1",
"pin-utils",
]
@@ -5578,6 +5872,7 @@ dependencies = [
"opendal-layer-dtrace",
"opendal-layer-fastmetrics",
"opendal-layer-fastrace",
+ "opendal-layer-foyer",
"opendal-layer-hotpath",
"opendal-layer-immutable-index",
"opendal-layer-logging",
@@ -5832,6 +6127,19 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "opendal-layer-foyer"
+version = "0.55.0"
+dependencies = [
+ "bincode",
+ "foyer",
+ "opendal-core",
+ "serde",
+ "size",
+ "tempfile",
+ "tokio",
+]
+
[[package]]
name = "opendal-layer-hotpath"
version = "0.55.0"
@@ -7053,6 +7361,15 @@ dependencies = [
"hashbrown 0.14.5",
]
+[[package]]
+name = "ordered_hash_map"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ab0e5f22bf6dd04abd854a8874247813a8fa2c8c1260eba6fbb150270ce7c176"
+dependencies = [
+ "hashbrown 0.13.2",
+]
+
[[package]]
name = "os_pipe"
version = "1.2.3"
@@ -7080,6 +7397,12 @@ dependencies = [
"sha2",
]
+[[package]]
+name = "panic-message"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d"
+
[[package]]
name = "parking"
version = "2.2.1"
@@ -8059,6 +8382,15 @@ dependencies = [
"rand_core 0.5.1",
]
+[[package]]
+name = "rand_xoshiro"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa"
+dependencies = [
+ "rand_core 0.6.4",
+]
+
[[package]]
name = "raw-cpuid"
version = "11.6.0"
@@ -9076,6 +9408,15 @@ dependencies = [
"zmij",
]
+[[package]]
+name = "serde_spanned"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776"
+dependencies = [
+ "serde_core",
+]
+
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@@ -9681,6 +10022,12 @@ dependencies = [
"unicode-properties",
]
+[[package]]
+name = "strsim"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
+
[[package]]
name = "strsim"
version = "0.11.1"
@@ -9868,7 +10215,7 @@ dependencies = [
"sha2",
"snap",
"storekey",
- "strsim",
+ "strsim 0.11.1",
"subtle",
"sysinfo",
"thiserror 1.0.69",
@@ -10413,11 +10760,26 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "toml"
+version = "0.9.10+spec-1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0825052159284a1a8b4d6c0c86cbc801f2da5afd2b225fa548c72f2e74002f48"
+dependencies = [
+ "indexmap 2.12.1",
+ "serde_core",
+ "serde_spanned",
+ "toml_datetime",
+ "toml_parser",
+ "toml_writer",
+ "winnow",
+]
+
[[package]]
name = "toml_datetime"
-version = "0.7.3"
+version = "0.7.5+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533"
+checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347"
dependencies = [
"serde_core",
]
@@ -10436,13 +10798,19 @@ dependencies = [
[[package]]
name = "toml_parser"
-version = "1.0.4"
+version = "1.0.6+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e"
+checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44"
dependencies = [
"winnow",
]
+[[package]]
+name = "toml_writer"
+version = "1.0.6+spec-1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607"
+
[[package]]
name = "tonic"
version = "0.10.2"
@@ -10766,6 +11134,15 @@ dependencies = [
"utf-8",
]
+[[package]]
+name = "twox-hash"
+version = "2.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c"
+dependencies = [
+ "rand 0.9.2",
+]
+
[[package]]
name = "typed-builder"
version = "0.22.0"
@@ -11918,3 +12295,31 @@ name = "zmij"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30e0d8dffbae3d840f64bda38e28391faef673a7b5a6017840f2a106c8145868"
+
+[[package]]
+name = "zstd"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
+dependencies = [
+ "zstd-safe",
+]
+
+[[package]]
+name = "zstd-safe"
+version = "7.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
+dependencies = [
+ "zstd-sys",
+]
+
+[[package]]
+name = "zstd-sys"
+version = "2.0.16+zstd.1.5.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748"
+dependencies = [
+ "cc",
+ "pkg-config",
+]
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 55b3962b9..f8002096b 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -99,6 +99,7 @@ layers-concurrent-limit =
["dep:opendal-layer-concurrent-limit"]
layers-dtrace = ["dep:opendal-layer-dtrace"]
layers-fastmetrics = ["dep:opendal-layer-fastmetrics"]
layers-fastrace = ["dep:opendal-layer-fastrace"]
+layers-foyer = ["dep:opendal-layer-foyer"]
layers-hotpath = ["dep:opendal-layer-hotpath"]
layers-immutable-index = ["dep:opendal-layer-immutable-index"]
layers-logging = ["dep:opendal-layer-logging"]
@@ -211,6 +212,7 @@ opendal-layer-chaos = { path = "layers/chaos", version =
"0.55.0", optional = tr
opendal-layer-concurrent-limit = { path = "layers/concurrent-limit", version =
"0.55.0", optional = true, default-features = false }
opendal-layer-fastmetrics = { path = "layers/fastmetrics", version = "0.55.0",
optional = true, default-features = false }
opendal-layer-fastrace = { path = "layers/fastrace", version = "0.55.0",
optional = true, default-features = false }
+opendal-layer-foyer = { path = "layers/foyer", version = "0.55.0", optional =
true, default-features = false }
opendal-layer-hotpath = { path = "layers/hotpath", version = "0.55.0",
optional = true, default-features = false }
opendal-layer-immutable-index = { path = "layers/immutable-index", version =
"0.55.0", optional = true, default-features = false }
opendal-layer-logging = { path = "layers/logging", version = "0.55.0",
optional = true, default-features = false }
diff --git a/core/core/src/docs/rfcs/0000_foyer_integration.md
b/core/core/src/docs/rfcs/6370_foyer_integration.md
similarity index 100%
rename from core/core/src/docs/rfcs/0000_foyer_integration.md
rename to core/core/src/docs/rfcs/6370_foyer_integration.md
diff --git a/core/layers/foyer/Cargo.toml b/core/layers/foyer/Cargo.toml
new file mode 100644
index 000000000..bba6fbc98
--- /dev/null
+++ b/core/layers/foyer/Cargo.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+description = "Apache OpenDAL foyer hybrid cache layer"
+name = "opendal-layer-foyer"
+
+authors = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+license = { workspace = true }
+repository = { workspace = true }
+rust-version = { workspace = true }
+version = { workspace = true }
+
+[package.metadata.docs.rs]
+all-features = true
+
+[dependencies]
+bincode = "1"
+foyer = { version = "0.18", features = ["serde"] }
+opendal-core = { path = "../../core", version = "0.55.0", default-features =
false }
+serde = { workspace = true, features = ["derive"] }
+
+[dev-dependencies]
+opendal-core = { path = "../../core", version = "0.55.0", features = [
+ "services-memory",
+] }
+size = "0.5"
+tempfile = "3"
+tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
diff --git a/core/layers/foyer/src/lib.rs b/core/layers/foyer/src/lib.rs
new file mode 100644
index 000000000..1242204a8
--- /dev/null
+++ b/core/layers/foyer/src/lib.rs
@@ -0,0 +1,630 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ future::Future,
+ ops::{Bound, Deref, Range, RangeBounds},
+ sync::Arc,
+};
+
+use foyer::{Code, CodeError, Error as FoyerError, HybridCache};
+
+use opendal_core::raw::oio::*;
+use opendal_core::raw::*;
+use opendal_core::*;
+
+/// Custom error type for when fetched data exceeds size limit.
+#[derive(Debug)]
+struct FetchSizeTooLarge;
+
+impl std::fmt::Display for FetchSizeTooLarge {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "fetched data size exceeds size limit")
+ }
+}
+
+impl std::error::Error for FetchSizeTooLarge {}
+
+fn extract_err(e: FoyerError) -> Error {
+ let e = match e.downcast::<Error>() {
+ Ok(e) => return e,
+ Err(e) => e,
+ };
+ Error::new(ErrorKind::Unexpected, e.to_string())
+}
+
+/// [`FoyerKey`] is a key for the foyer cache. It's encoded via bincode, which
is
+/// backed by foyer's "serde" feature.
+///
+/// It's possible to specify a version in the [`OpRead`] args:
+///
+/// - If a version is given, the object is cached under that versioned key.
+/// - If version is not supplied, the object is cached exactly as returned by
the backend,
+/// We do NOT interpret `None` as "latest" and we do not promote it to any
other version.
+#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize,
serde::Deserialize)]
+pub struct FoyerKey {
+ pub path: String,
+ pub version: Option<String>,
+}
+
+/// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code`
trait.
+#[derive(Debug)]
+pub struct FoyerValue(pub Buffer);
+
+impl Deref for FoyerValue {
+ type Target = Buffer;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl Code for FoyerValue {
+ fn encode(&self, writer: &mut impl std::io::Write) ->
std::result::Result<(), CodeError> {
+ let len = self.0.len() as u64;
+ writer.write_all(&len.to_le_bytes())?;
+ std::io::copy(&mut self.0.clone(), writer)?;
+ Ok(())
+ }
+
+ fn decode(reader: &mut impl std::io::Read) -> std::result::Result<Self,
CodeError>
+ where
+ Self: Sized,
+ {
+ let mut len_bytes = [0u8; 8];
+ reader.read_exact(&mut len_bytes)?;
+ let len = u64::from_le_bytes(len_bytes) as usize;
+ let mut buffer = vec![0u8; len];
+ reader.read_exact(&mut buffer[..len])?;
+ Ok(FoyerValue(buffer.into()))
+ }
+
+ fn estimated_size(&self) -> usize {
+ 8 + self.0.len()
+ }
+}
+
+/// Hybrid cache layer for OpenDAL that uses
[foyer](https://github.com/foyer-rs/foyer) for caching.
+///
+/// # Operation Behavior
+/// - `write`: [`FoyerLayer`] will write to the foyer hybrid cache after the
service's write operation is completed.
+/// - `read`: [`FoyerLayer`] will first check the foyer hybrid cache for the
data. If the data is not found, it will perform the read operation on the
service and cache the result.
+/// - `delete`: [`FoyerLayer`] will remove the data from the foyer hybrid
cache regardless of whether the service's delete operation is successful.
+/// - Other operations: [`FoyerLayer`] will not cache the results of other
operations, such as `list`, `copy`, `rename`, etc. They will be passed through
to the underlying accessor without caching.
+///
+/// # Examples
+///
+/// ```no_run
+/// use opendal_core::{Operator, services::Memory};
+/// use opendal_layer_foyer::FoyerLayer;
+/// use foyer::{HybridCacheBuilder, Engine};
+///
+/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
+/// let cache = HybridCacheBuilder::new()
+/// .memory(64 * 1024 * 1024) // 64MB memory cache
+/// .with_shards(4)
+/// .storage(Engine::Large(Default::default()))
+/// .build()
+/// .await?;
+///
+/// let op = Operator::new(Memory::default())?
+/// .layer(FoyerLayer::new(cache))
+/// .finish();
+/// # Ok(())
+/// # }
+/// ```
+///
+/// # Note
+///
+/// If the object version is enabled, the foyer cache layer will treat the
objects with same path but different versions as different objects.
+#[derive(Debug)]
+pub struct FoyerLayer {
+ cache: HybridCache<FoyerKey, FoyerValue>,
+ size_limit: Range<usize>,
+}
+
+impl FoyerLayer {
+ /// Creates a new `FoyerLayer` with the given foyer hybrid cache.
+ pub fn new(cache: HybridCache<FoyerKey, FoyerValue>) -> Self {
+ FoyerLayer {
+ cache,
+ size_limit: 0..usize::MAX,
+ }
+ }
+
+ /// Sets the size limit for caching.
+ ///
+ /// It is recommended to set a size limit to avoid caching large files
that may not be suitable for caching.
+ pub fn with_size_limit<R: RangeBounds<usize>>(mut self, size_limit: R) ->
Self {
+ let start = match size_limit.start_bound() {
+ Bound::Included(v) => *v,
+ Bound::Excluded(v) => *v + 1,
+ Bound::Unbounded => 0,
+ };
+ let end = match size_limit.end_bound() {
+ Bound::Included(v) => *v + 1,
+ Bound::Excluded(v) => *v,
+ Bound::Unbounded => usize::MAX,
+ };
+ self.size_limit = start..end;
+ self
+ }
+}
+
+impl<A: Access> Layer<A> for FoyerLayer {
+ type LayeredAccess = FoyerAccessor<A>;
+
+ fn layer(&self, accessor: A) -> Self::LayeredAccess {
+ let cache = self.cache.clone();
+ FoyerAccessor {
+ inner: Arc::new(Inner {
+ accessor,
+ cache,
+ size_limit: self.size_limit.clone(),
+ }),
+ }
+ }
+}
+
+#[derive(Debug)]
+struct Inner<A: Access> {
+ accessor: A,
+ cache: HybridCache<FoyerKey, FoyerValue>,
+ size_limit: Range<usize>,
+}
+
+#[derive(Debug)]
+pub struct FoyerAccessor<A: Access> {
+ inner: Arc<Inner<A>>,
+}
+
+impl<A: Access> LayeredAccess for FoyerAccessor<A> {
+ type Inner = A;
+ type Reader = Buffer;
+ type Writer = Writer<A>;
+ type Lister = A::Lister;
+ type Deleter = Deleter<A>;
+
+ fn inner(&self) -> &Self::Inner {
+ &self.inner.accessor
+ }
+
+ fn info(&self) -> Arc<AccessorInfo> {
+ self.inner.accessor.info()
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let path_str = path.to_string();
+ let version = args.version().map(|v| v.to_string());
+ let original_args = args.clone();
+
+ // Extract range bounds before async block to avoid lifetime issues
+ let (range_start, range_end) = {
+ let r = args.range();
+ let start = r.offset();
+ let end = r.size().map(|size| start + size);
+ (start, end)
+ };
+
+ // Use fetch to read data from cache or fallback to remote. fetch()
can automatically
+ // handle the thundering herd problem by ensuring only one request is
made for a given
+ // key.
+ //
+ // Please note that we only cache the object if it's smaller than
size_limit. And we'll
+ // fetch the ENTIRE object from remote to put it into cache, then
slice it to the requested
+ // range.
+ let result = self
+ .inner
+ .cache
+ .fetch(
+ FoyerKey {
+ path: path_str.clone(),
+ version: version.clone(),
+ },
+ || {
+ let inner = self.inner.clone();
+ let path_clone = path_str.clone();
+ async move {
+ // read the metadata first, if it's too large, do not
cache
+ let metadata = inner
+ .accessor
+ .stat(&path_clone, OpStat::default())
+ .await
+ .map_err(FoyerError::other)?
+ .into_metadata();
+
+ let size = metadata.content_length() as usize;
+ if !inner.size_limit.contains(&size) {
+ return Err(FoyerError::other(FetchSizeTooLarge));
+ }
+
+ // fetch the ENTIRE object from remote.
+ let (_, mut reader) = inner
+ .accessor
+ .read(
+ &path_clone,
+
OpRead::default().with_range(BytesRange::new(0, None)),
+ )
+ .await
+ .map_err(FoyerError::other)?;
+ let buffer =
reader.read_all().await.map_err(FoyerError::other)?;
+
+ Ok(FoyerValue(buffer))
+ }
+ },
+ )
+ .await;
+
+ // If got entry from cache, slice it to the requested range. If it's
larger than size_limit,
+ // we'll simply forward the request to the underlying accessor with
user's given range.
+ match result {
+ Ok(entry) => {
+ let end = range_end.unwrap_or(entry.len() as u64);
+ let range = BytesContentRange::default()
+ .with_range(range_start, end - 1)
+ .with_size(entry.len() as _);
+ let buffer = entry.slice(range_start as usize..end as usize);
+ let rp = RpRead::new()
+ .with_size(Some(buffer.len() as _))
+ .with_range(Some(range));
+ Ok((rp, buffer))
+ }
+ Err(e) => match e.downcast::<FetchSizeTooLarge>() {
+ Ok(_) => {
+ let (rp, mut reader) = self.inner.accessor.read(path,
original_args).await?;
+ let buffer = reader.read_all().await?;
+ Ok((rp, buffer))
+ }
+ Err(e) => Err(extract_err(e)),
+ },
+ }
+ }
+
+ fn write(
+ &self,
+ path: &str,
+ args: OpWrite,
+ ) -> impl Future<Output = Result<(RpWrite, Self::Writer)>> + MaybeSend {
+ let inner = self.inner.clone();
+ async move {
+ let (rp, w) = self.inner.accessor.write(path, args).await?;
+ Ok((
+ rp,
+ Writer {
+ w,
+ buf: QueueBuf::new(),
+ path: path.to_string(),
+ inner,
+ skip_cache: false,
+ },
+ ))
+ }
+ }
+
+ fn delete(&self) -> impl Future<Output = Result<(RpDelete,
Self::Deleter)>> + MaybeSend {
+ let inner = self.inner.clone();
+ async move {
+ let (rp, d) = inner.accessor.delete().await?;
+ Ok((
+ rp,
+ Deleter {
+ deleter: d,
+ keys: vec![],
+ inner,
+ },
+ ))
+ }
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
+ self.inner.accessor.list(path, args).await
+ }
+
+ // TODO(MrCroxx): Implement copy, rename with foyer cache.
+}
+
+pub struct Writer<A: Access> {
+ w: A::Writer,
+ buf: QueueBuf,
+ path: String,
+ inner: Arc<Inner<A>>,
+ skip_cache: bool,
+}
+
+impl<A: Access> oio::Write for Writer<A> {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
+ if self.inner.size_limit.contains(&(self.buf.len() + bs.len())) {
+ self.buf.push(bs.clone());
+ self.skip_cache = false;
+ } else {
+ self.buf.clear();
+ self.skip_cache = true;
+ }
+ self.w.write(bs).await
+ }
+
+ async fn close(&mut self) -> Result<Metadata> {
+ let buffer = self.buf.clone().collect();
+ let metadata = self.w.close().await?;
+ if !self.skip_cache {
+ self.inner.cache.insert(
+ FoyerKey {
+ path: self.path.clone(),
+ version: metadata.version().map(|v| v.to_string()),
+ },
+ FoyerValue(buffer),
+ );
+ }
+ Ok(metadata)
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ self.buf.clear();
+ self.w.abort().await
+ }
+}
+
+pub struct Deleter<A: Access> {
+ deleter: A::Deleter,
+ keys: Vec<FoyerKey>,
+ inner: Arc<Inner<A>>,
+}
+
+impl<A: Access> oio::Delete for Deleter<A> {
+ async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
+ self.deleter.delete(path, args.clone()).await?;
+ self.keys.push(FoyerKey {
+ path: path.to_string(),
+ version: args.version().map(|v| v.to_string()),
+ });
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ for key in &self.keys {
+ self.inner.cache.remove(key);
+ }
+ self.deleter.close().await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use foyer::{
+ DirectFsDeviceOptions, Engine, HybridCacheBuilder, LargeEngineOptions,
RecoverMode,
+ };
+ use opendal_core::{Operator, services::Memory};
+ use size::consts::MiB;
+ use std::io::Cursor;
+
+ use super::*;
+
+ fn key(i: u8) -> String {
+ format!("obj-{i}")
+ }
+
+ fn value(i: u8) -> Vec<u8> {
+ // ~ 64KiB with metadata
+ vec![i; 63 * 1024]
+ }
+
+ #[tokio::test]
+ async fn test() {
+ let dir = tempfile::tempdir().unwrap();
+
+ let cache = HybridCacheBuilder::new()
+ .memory(10)
+ .with_shards(1)
+ .storage(Engine::Large(LargeEngineOptions::new()))
+ .with_device_options(
+ DirectFsDeviceOptions::new(dir.path())
+ .with_capacity(16 * MiB as usize)
+ .with_file_size(MiB as usize),
+ )
+ .with_recover_mode(RecoverMode::None)
+ .build()
+ .await
+ .unwrap();
+
+ let op = Operator::new(Memory::default())
+ .unwrap()
+ .layer(FoyerLayer::new(cache.clone()))
+ .finish();
+
+ assert!(op.list("/").await.unwrap().is_empty());
+
+ for i in 0..64 {
+ op.write(&key(i), value(i)).await.unwrap();
+ }
+
+ assert_eq!(op.list("/").await.unwrap().len(), 64);
+
+ for i in 0..64 {
+ let buf = op.read(&key(i)).await.unwrap();
+ assert_eq!(buf.to_vec(), value(i));
+ }
+
+ cache.clear().await.unwrap();
+
+ for i in 0..64 {
+ let buf = op.read(&key(i)).await.unwrap();
+ assert_eq!(buf.to_vec(), value(i));
+ }
+
+ for i in 0..64 {
+ op.delete(&key(i)).await.unwrap();
+ }
+
+ assert!(op.list("/").await.unwrap().is_empty());
+
+ for i in 0..64 {
+ let res = op.read(&key(i)).await;
+ assert!(res.is_err(), "should fail to read deleted file");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_size_limit() {
+ let dir = tempfile::tempdir().unwrap();
+
+ let cache = HybridCacheBuilder::new()
+ .memory(1024 * 1024)
+ .with_shards(1)
+ .storage(Engine::Large(LargeEngineOptions::new()))
+ .with_device_options(
+ DirectFsDeviceOptions::new(dir.path())
+ .with_capacity(16 * MiB as usize)
+ .with_file_size(MiB as usize),
+ )
+ .with_recover_mode(RecoverMode::None)
+ .build()
+ .await
+ .unwrap();
+
+ // Set size limit: only cache files between 1KB and 10KB
+ let op = Operator::new(Memory::default())
+ .unwrap()
+ .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..10 *
1024))
+ .finish();
+
+ let small_data = vec![1u8; 5 * 1024]; // 5KB - should be cached
+ let large_data = vec![2u8; 20 * 1024]; // 20KB - should NOT be cached
+ let tiny_data = vec![3u8; 512]; // 512B - below size limit, should NOT
be cached
+
+ // Write all files
+ op.write("small.txt", small_data.clone()).await.unwrap();
+ op.write("large.txt", large_data.clone()).await.unwrap();
+ op.write("tiny.txt", tiny_data.clone()).await.unwrap();
+
+ // All should be readable
+ let read_small = op.read("small.txt").await.unwrap();
+ assert_eq!(read_small.to_vec(), small_data);
+
+ let read_large = op.read("large.txt").await.unwrap();
+ assert_eq!(read_large.to_vec(), large_data);
+
+ let read_tiny = op.read("tiny.txt").await.unwrap();
+ assert_eq!(read_tiny.to_vec(), tiny_data);
+
+ // Clear the cache to test read-through behavior
+ cache.clear().await.unwrap();
+
+ // All files should still be readable from underlying storage
+ let read_small = op.read("small.txt").await.unwrap();
+ assert_eq!(read_small.to_vec(), small_data);
+
+ let read_large = op.read("large.txt").await.unwrap();
+ assert_eq!(read_large.to_vec(), large_data);
+
+ let read_tiny = op.read("tiny.txt").await.unwrap();
+ assert_eq!(read_tiny.to_vec(), tiny_data);
+
+ // After reading, small file should be cached, but large and tiny
should not
+ // We can verify this by reading with range - cached files should
support range reads
+ let read_small_range =
op.read_with("small.txt").range(0..1024).await.unwrap();
+ assert_eq!(read_small_range.len(), 1024);
+ assert_eq!(read_small_range.to_vec(), small_data[0..1024]);
+ }
+
+ #[test]
+ fn test_error() {
+ let e = Error::new(ErrorKind::NotFound, "not found");
+ let fe = FoyerError::other(e);
+ let oe = extract_err(fe);
+ assert_eq!(oe.kind(), ErrorKind::NotFound);
+ }
+
+ #[test]
+ fn test_foyer_key_version_none_vs_empty() {
+ let key_none = FoyerKey {
+ path: "test/path".to_string(),
+ version: None,
+ };
+
+ let key_empty = FoyerKey {
+ path: "test/path".to_string(),
+ version: Some("".to_string()),
+ };
+
+ let mut buf_none = Vec::new();
+ key_none.encode(&mut buf_none).unwrap();
+
+ let mut buf_empty = Vec::new();
+ key_empty.encode(&mut buf_empty).unwrap();
+
+ assert_ne!(
+ buf_none, buf_empty,
+ "Serialization of version=None and version=\"\" should be
different"
+ );
+
+ let decoded_none = FoyerKey::decode(&mut
Cursor::new(&buf_none)).unwrap();
+ assert_eq!(decoded_none, key_none);
+ let decoded_empty = FoyerKey::decode(&mut
Cursor::new(&buf_empty)).unwrap();
+ assert_eq!(decoded_empty, key_empty);
+ }
+
+ #[test]
+ fn test_foyer_key_serde() {
+ use std::io::Cursor;
+
+ let test_cases = vec![
+ FoyerKey {
+ path: "simple".to_string(),
+ version: None,
+ },
+ FoyerKey {
+ path: "with/slash/path".to_string(),
+ version: None,
+ },
+ FoyerKey {
+ path: "versioned".to_string(),
+ version: Some("v1.0.0".to_string()),
+ },
+ FoyerKey {
+ path: "empty-version".to_string(),
+ version: Some("".to_string()),
+ },
+ FoyerKey {
+ path: "".to_string(),
+ version: None,
+ },
+ FoyerKey {
+ path: "unicode/θ·―εΎ/π".to_string(),
+ version: Some("ηζ¬-1".to_string()),
+ },
+ FoyerKey {
+ path: "long/".to_string().repeat(100),
+ version: Some("long-version-".to_string().repeat(50)),
+ },
+ ];
+
+ for original in test_cases {
+ let mut buffer = Vec::new();
+ original
+ .encode(&mut buffer)
+ .expect("encoding should succeed");
+
+ let decoded =
+ FoyerKey::decode(&mut Cursor::new(&buffer)).expect("decoding
should succeed");
+
+ assert_eq!(
+ decoded, original,
+ "decode(encode(key)) should equal original key"
+ );
+ }
+ }
+}
diff --git a/core/src/lib.rs b/core/src/lib.rs
index da5589d77..2205a3fb0 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -385,6 +385,8 @@ pub mod layers {
pub use opendal_layer_fastmetrics::*;
#[cfg(feature = "layers-fastrace")]
pub use opendal_layer_fastrace::*;
+ #[cfg(feature = "layers-foyer")]
+ pub use opendal_layer_foyer::*;
#[cfg(feature = "layers-hotpath")]
pub use opendal_layer_hotpath::*;
#[cfg(feature = "layers-immutable-index")]