This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 8f9698cd refactor: remove async remote write decode logic (#1631)
8f9698cd is described below
commit 8f9698cdf83d02e12b93581464114d4e7cef483a
Author: Peiyang He <[email protected]>
AuthorDate: Thu Dec 11 15:50:23 2025 +0800
refactor: remove async remote write decode logic (#1631)
## Rationale
Deserialization is a CPU-intensive task, so introducing async does not
provide significant benefits. In addition, using async causes function
contagion issues. Therefore, it is necessary to remove the async decode
logic.
## Detailed Changes
- Remove `decode_async` interface
- Change the back-end object pool dependency from `deadpool` to
`object-pool`
- Change benchmark logic accordingly
## Test Plan
Manual test
---
Cargo.lock | 897 +++++++++++++++++----
Cargo.toml | 4 +-
.../assets/remote-write-concurrent-performance.png | Bin 578747 -> 460173 bytes
docs/assets/remote-write-memory-performance.png | Bin 119032 -> 55817 bytes
.../assets/remote-write-sequential-performance.png | Bin 620542 -> 607779 bytes
src/benchmarks/Cargo.toml | 14 +-
src/benchmarks/benches/bench.rs | 42 +-
src/benchmarks/remote_write_memory_bench.py | 284 ++-----
src/benchmarks/src/bin/parser_mem.rs | 190 +++--
src/benchmarks/src/bin/pool_stats.rs | 82 --
src/benchmarks/src/remote_write_bench.rs | 123 ++-
src/benchmarks/src/util.rs | 140 +---
src/remote_write/Cargo.toml | 4 +-
src/remote_write/README.md | 98 +--
src/remote_write/src/pooled_parser.rs | 24 +-
src/remote_write/src/pooled_types.rs | 38 +-
src/remote_write/tests/equivalence_test.rs | 28 +-
17 files changed, 1093 insertions(+), 875 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index a97698ea..031004da 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -40,7 +40,7 @@ dependencies = [
"flate2",
"futures-core",
"h2",
- "http",
+ "http 0.2.12",
"httparse",
"httpdate",
"itoa",
@@ -76,7 +76,7 @@ checksum =
"13d324164c51f63867b57e73ba5936ea151b8a41a1d23d1031eeb9f70d0236f8"
dependencies = [
"bytestring",
"cfg-if",
- "http",
+ "http 0.2.12",
"regex",
"regex-lite",
"serde",
@@ -105,7 +105,7 @@ dependencies = [
"futures-core",
"futures-util",
"mio",
- "socket2",
+ "socket2 0.5.7",
"tokio",
"tracing",
]
@@ -150,7 +150,7 @@ dependencies = [
"bytes",
"bytestring",
"cfg-if",
- "cookie",
+ "cookie 0.16.2",
"derive_more",
"encoding_rs",
"futures-core",
@@ -168,7 +168,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"smallvec",
- "socket2",
+ "socket2 0.5.7",
"time",
"url",
]
@@ -185,15 +185,6 @@ dependencies = [
"syn",
]
-[[package]]
-name = "addr2line"
-version = "0.24.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375"
-dependencies = [
- "gimli",
-]
-
[[package]]
name = "adler2"
version = "2.0.0"
@@ -320,6 +311,12 @@ version = "1.0.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f00e1f6e58a40e807377c75c6a7f97bf9044fab57816f2414e6f5f4499d7b8"
+[[package]]
+name = "arc-swap"
+version = "1.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
+
[[package]]
name = "arrayref"
version = "0.3.8"
@@ -381,7 +378,7 @@ dependencies = [
"chrono",
"chrono-tz",
"half",
- "hashbrown",
+ "hashbrown 0.14.5",
"num",
]
@@ -604,21 +601,6 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
-[[package]]
-name = "backtrace"
-version = "0.3.74"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
-dependencies = [
- "addr2line",
- "cfg-if",
- "libc",
- "miniz_oxide",
- "object",
- "rustc-demangle",
- "windows-targets",
-]
-
[[package]]
name = "base64"
version = "0.22.1"
@@ -629,11 +611,12 @@ checksum =
"72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
name = "benchmarks"
version = "2.2.0-alpha"
dependencies = [
+ "anyhow",
"bytes",
"columnar_storage",
"common",
"criterion",
- "deadpool",
+ "hotpath",
"num_cpus",
"pb_types",
"prost",
@@ -643,8 +626,6 @@ dependencies = [
"remote_write",
"serde",
"serde_json",
- "tikv-jemalloc-ctl",
- "tikv-jemallocator",
"tokio",
"toml",
"tracing",
@@ -782,10 +763,11 @@ checksum =
"37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
-version = "1.1.18"
+version = "1.2.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476"
+checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2"
dependencies = [
+ "find-msvc-tools",
"jobserver",
"libc",
"shlex",
@@ -806,7 +788,7 @@ dependencies = [
"android-tzdata",
"iana-time-zone",
"num-traits",
- "windows-targets",
+ "windows-targets 0.52.6",
]
[[package]]
@@ -903,6 +885,15 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
+[[package]]
+name = "colored"
+version = "3.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e"
+dependencies = [
+ "windows-sys 0.59.0",
+]
+
[[package]]
name = "columnar_storage"
version = "2.2.0-alpha"
@@ -993,6 +984,35 @@ dependencies = [
"version_check",
]
+[[package]]
+name = "cookie"
+version = "0.18.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ddef33a339a91ea89fb53151bd0a4689cfce27055c291dfa69945475d22c747"
+dependencies = [
+ "percent-encoding",
+ "time",
+ "version_check",
+]
+
+[[package]]
+name = "cookie_store"
+version = "0.22.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3fc4bff745c9b4c7fb1e97b25d13153da2bc7796260141df62378998d070207f"
+dependencies = [
+ "cookie 0.18.1",
+ "document-features",
+ "idna 1.1.0",
+ "indexmap",
+ "log",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "time",
+ "url",
+]
+
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
@@ -1053,6 +1073,15 @@ dependencies = [
"itertools 0.10.5",
]
+[[package]]
+name = "crossbeam-channel"
+version = "0.5.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
+dependencies = [
+ "crossbeam-utils",
+]
+
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
@@ -1123,7 +1152,7 @@ checksum =
"5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
- "hashbrown",
+ "hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
@@ -1165,7 +1194,7 @@ dependencies = [
"futures",
"glob",
"half",
- "hashbrown",
+ "hashbrown 0.14.5",
"indexmap",
"itertools 0.13.0",
"log",
@@ -1214,7 +1243,7 @@ dependencies = [
"arrow-schema",
"chrono",
"half",
- "hashbrown",
+ "hashbrown 0.14.5",
"indexmap",
"instant",
"libc",
@@ -1248,7 +1277,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"futures",
- "hashbrown",
+ "hashbrown 0.14.5",
"log",
"object_store",
"parking_lot",
@@ -1308,7 +1337,7 @@ dependencies = [
"datafusion-common",
"datafusion-execution",
"datafusion-expr",
- "hashbrown",
+ "hashbrown 0.14.5",
"hex",
"itertools 0.13.0",
"log",
@@ -1415,7 +1444,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"datafusion-physical-expr",
- "hashbrown",
+ "hashbrown 0.14.5",
"indexmap",
"itertools 0.13.0",
"log",
@@ -1443,7 +1472,7 @@ dependencies = [
"datafusion-functions-aggregate-common",
"datafusion-physical-expr-common",
"half",
- "hashbrown",
+ "hashbrown 0.14.5",
"indexmap",
"itertools 0.13.0",
"log",
@@ -1461,7 +1490,7 @@ dependencies = [
"arrow",
"datafusion-common",
"datafusion-expr-common",
- "hashbrown",
+ "hashbrown 0.14.5",
"rand",
]
@@ -1505,7 +1534,7 @@ dependencies = [
"datafusion-physical-expr-common",
"futures",
"half",
- "hashbrown",
+ "hashbrown 0.14.5",
"indexmap",
"itertools 0.13.0",
"log",
@@ -1534,24 +1563,6 @@ dependencies = [
"strum",
]
-[[package]]
-name = "deadpool"
-version = "0.10.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
-dependencies = [
- "async-trait",
- "deadpool-runtime",
- "num_cpus",
- "tokio",
-]
-
-[[package]]
-name = "deadpool-runtime"
-version = "0.1.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
-
[[package]]
name = "deranged"
version = "0.3.11"
@@ -1585,12 +1596,59 @@ dependencies = [
"subtle",
]
+[[package]]
+name = "dirs-next"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1"
+dependencies = [
+ "cfg-if",
+ "dirs-sys-next",
+]
+
+[[package]]
+name = "dirs-sys-next"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d"
+dependencies = [
+ "libc",
+ "redox_users",
+ "winapi",
+]
+
+[[package]]
+name = "displaydoc"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "document-features"
+version = "0.2.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61"
+dependencies = [
+ "litrs",
+]
+
[[package]]
name = "either"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
+[[package]]
+name = "encode_unicode"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
+
[[package]]
name = "encoding_rs"
version = "0.8.35"
@@ -1637,12 +1695,28 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "eyre"
+version = "0.6.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec"
+dependencies = [
+ "indenter",
+ "once_cell",
+]
+
[[package]]
name = "fastrand"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
+[[package]]
+name = "find-msvc-tools"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127"
+
[[package]]
name = "fixedbitset"
version = "0.4.2"
@@ -1794,12 +1868,6 @@ dependencies = [
"wasi",
]
-[[package]]
-name = "gimli"
-version = "0.31.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64"
-
[[package]]
name = "glob"
version = "0.3.1"
@@ -1817,7 +1885,7 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
- "http",
+ "http 0.2.12",
"indexmap",
"slab",
"tokio",
@@ -1846,6 +1914,22 @@ dependencies = [
"allocator-api2",
]
+[[package]]
+name = "hashbrown"
+version = "0.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d"
+
+[[package]]
+name = "hdrhistogram"
+version = "7.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
+dependencies = [
+ "byteorder",
+ "num-traits",
+]
+
[[package]]
name = "heck"
version = "0.5.0"
@@ -1879,6 +1963,39 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "hotpath"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6189994b73d164becf3187cc046e68b3f7e6c29e2ef61c30c9e75601fd148daf"
+dependencies = [
+ "arc-swap",
+ "cfg-if",
+ "clap",
+ "colored",
+ "crossbeam-channel",
+ "eyre",
+ "hdrhistogram",
+ "hotpath-macros",
+ "prettytable-rs",
+ "quanta",
+ "serde",
+ "serde_json",
+ "tokio",
+ "ureq",
+]
+
+[[package]]
+name = "hotpath-macros"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "15ec66a715a703f2e7b7b84f468b4f0187172ef998baa62fcce8d1174d9edb11"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "http"
version = "0.2.12"
@@ -1890,6 +2007,17 @@ dependencies = [
"itoa",
]
+[[package]]
+name = "http"
+version = "1.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
[[package]]
name = "httparse"
version = "1.9.5"
@@ -1931,6 +2059,87 @@ dependencies = [
"cc",
]
+[[package]]
+name = "icu_collections"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43"
+dependencies = [
+ "displaydoc",
+ "potential_utf",
+ "yoke",
+ "zerofrom",
+ "zerovec",
+]
+
+[[package]]
+name = "icu_locale_core"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6"
+dependencies = [
+ "displaydoc",
+ "litemap",
+ "tinystr",
+ "writeable",
+ "zerovec",
+]
+
+[[package]]
+name = "icu_normalizer"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599"
+dependencies = [
+ "icu_collections",
+ "icu_normalizer_data",
+ "icu_properties",
+ "icu_provider",
+ "smallvec",
+ "zerovec",
+]
+
+[[package]]
+name = "icu_normalizer_data"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a"
+
+[[package]]
+name = "icu_properties"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99"
+dependencies = [
+ "icu_collections",
+ "icu_locale_core",
+ "icu_properties_data",
+ "icu_provider",
+ "zerotrie",
+ "zerovec",
+]
+
+[[package]]
+name = "icu_properties_data"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899"
+
+[[package]]
+name = "icu_provider"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614"
+dependencies = [
+ "displaydoc",
+ "icu_locale_core",
+ "writeable",
+ "yoke",
+ "zerofrom",
+ "zerotrie",
+ "zerovec",
+]
+
[[package]]
name = "idna"
version = "0.5.0"
@@ -1941,20 +2150,47 @@ dependencies = [
"unicode-normalization",
]
+[[package]]
+name = "idna"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de"
+dependencies = [
+ "idna_adapter",
+ "smallvec",
+ "utf8_iter",
+]
+
+[[package]]
+name = "idna_adapter"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344"
+dependencies = [
+ "icu_normalizer",
+ "icu_properties",
+]
+
[[package]]
name = "impl-more"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aae21c3177a27788957044151cc2800043d127acaa460a47ebb9b84dfa2c6aa0"
+[[package]]
+name = "indenter"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "964de6e86d545b246d84badc0fef527924ace5134f30641c203ef52ba83f58d5"
+
[[package]]
name = "indexmap"
-version = "2.5.0"
+version = "2.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5"
+checksum = "6717a8d2a5a929a1a2eb43a12812498ed141a0bcfb7e8f7844fbdbe4303bba9f"
dependencies = [
"equivalent",
- "hashbrown",
+ "hashbrown 0.16.0",
]
[[package]]
@@ -2118,9 +2354,9 @@ dependencies = [
[[package]]
name = "libc"
-version = "0.2.158"
+version = "0.2.177"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
+checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976"
[[package]]
name = "libm"
@@ -2128,12 +2364,34 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
+[[package]]
+name = "libredox"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb"
+dependencies = [
+ "bitflags 2.6.0",
+ "libc",
+]
+
[[package]]
name = "linux-raw-sys"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
+[[package]]
+name = "litemap"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77"
+
+[[package]]
+name = "litrs"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092"
+
[[package]]
name = "local-channel"
version = "0.1.5"
@@ -2163,9 +2421,9 @@ dependencies = [
[[package]]
name = "log"
-version = "0.4.22"
+version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
+checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
[[package]]
name = "lz4_flex"
@@ -2371,12 +2629,12 @@ dependencies = [
]
[[package]]
-name = "object"
-version = "0.36.4"
+name = "object-pool"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a"
+checksum = "ceffa2e6ccecd71e60a0f06b655df2c66acd1c0c892dafefc96fd49d65f71d53"
dependencies = [
- "memchr",
+ "parking_lot",
]
[[package]]
@@ -2447,7 +2705,7 @@ dependencies = [
"libc",
"redox_syscall",
"smallvec",
- "windows-targets",
+ "windows-targets 0.52.6",
]
[[package]]
@@ -2471,7 +2729,7 @@ dependencies = [
"flate2",
"futures",
"half",
- "hashbrown",
+ "hashbrown 0.14.5",
"lz4_flex",
"num",
"num-bigint",
@@ -2629,6 +2887,15 @@ dependencies = [
"plotters-backend",
]
+[[package]]
+name = "potential_utf"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77"
+dependencies = [
+ "zerovec",
+]
+
[[package]]
name = "powerfmt"
version = "0.2.0"
@@ -2654,6 +2921,19 @@ dependencies = [
"syn",
]
+[[package]]
+name = "prettytable-rs"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eea25e07510aa6ab6547308ebe3c036016d162b8da920dbb079e3ba8acf3d95a"
+dependencies = [
+ "encode_unicode",
+ "is-terminal",
+ "lazy_static",
+ "term",
+ "unicode-width",
+]
+
[[package]]
name = "proc-macro2"
version = "1.0.86"
@@ -2767,6 +3047,21 @@ dependencies = [
"thiserror",
]
+[[package]]
+name = "quanta"
+version = "0.12.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
+dependencies = [
+ "crossbeam-utils",
+ "libc",
+ "once_cell",
+ "raw-cpuid",
+ "wasi",
+ "web-sys",
+ "winapi",
+]
+
[[package]]
name = "quick-protobuf"
version = "0.8.1"
@@ -2815,6 +3110,15 @@ dependencies = [
"getrandom",
]
+[[package]]
+name = "raw-cpuid"
+version = "11.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186"
+dependencies = [
+ "bitflags 2.6.0",
+]
+
[[package]]
name = "rayon"
version = "1.10.0"
@@ -2844,6 +3148,17 @@ dependencies = [
"bitflags 2.6.0",
]
+[[package]]
+name = "redox_users"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43"
+dependencies = [
+ "getrandom",
+ "libredox",
+ "thiserror",
+]
+
[[package]]
name = "regex"
version = "1.10.6"
@@ -2899,20 +3214,26 @@ name = "remote_write"
version = "2.2.0-alpha"
dependencies = [
"anyhow",
- "async-trait",
"bytes",
- "deadpool",
+ "object-pool",
"once_cell",
"pb_types",
"prost",
- "tokio",
]
[[package]]
-name = "rustc-demangle"
-version = "0.1.24"
+name = "ring"
+version = "0.17.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
+checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
+dependencies = [
+ "cc",
+ "cfg-if",
+ "getrandom",
+ "libc",
+ "untrusted",
+ "windows-sys 0.52.0",
+]
[[package]]
name = "rustc_version"
@@ -2936,6 +3257,50 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "rustls"
+version = "0.23.34"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7"
+dependencies = [
+ "log",
+ "once_cell",
+ "ring",
+ "rustls-pki-types",
+ "rustls-webpki",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
+name = "rustls-pemfile"
+version = "2.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50"
+dependencies = [
+ "rustls-pki-types",
+]
+
+[[package]]
+name = "rustls-pki-types"
+version = "1.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a"
+dependencies = [
+ "zeroize",
+]
+
+[[package]]
+name = "rustls-webpki"
+version = "0.103.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52"
+dependencies = [
+ "ring",
+ "rustls-pki-types",
+ "untrusted",
+]
+
[[package]]
name = "rustversion"
version = "1.0.17"
@@ -3157,6 +3522,16 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "socket2"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881"
+dependencies = [
+ "libc",
+ "windows-sys 0.60.2",
+]
+
[[package]]
name = "sqlparser"
version = "0.51.0"
@@ -3178,6 +3553,12 @@ dependencies = [
"syn",
]
+[[package]]
+name = "stable_deref_trait"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
+
[[package]]
name = "static_assertions"
version = "1.1.0"
@@ -3229,6 +3610,17 @@ dependencies = [
"unicode-ident",
]
+[[package]]
+name = "synstructure"
+version = "0.13.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "temp-dir"
version = "0.1.14"
@@ -3248,6 +3640,17 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "term"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f"
+dependencies = [
+ "dirs-next",
+ "rustversion",
+ "winapi",
+]
+
[[package]]
name = "test-log"
version = "0.2.16"
@@ -3311,37 +3714,6 @@ dependencies = [
"ordered-float",
]
-[[package]]
-name = "tikv-jemalloc-ctl"
-version = "0.5.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c"
-dependencies = [
- "libc",
- "paste",
- "tikv-jemalloc-sys",
-]
-
-[[package]]
-name = "tikv-jemalloc-sys"
-version = "0.5.4+5.3.0-patched"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1"
-dependencies = [
- "cc",
- "libc",
-]
-
-[[package]]
-name = "tikv-jemallocator"
-version = "0.5.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca"
-dependencies = [
- "libc",
- "tikv-jemalloc-sys",
-]
-
[[package]]
name = "time"
version = "0.3.37"
@@ -3384,6 +3756,16 @@ dependencies = [
"crunchy",
]
+[[package]]
+name = "tinystr"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869"
+dependencies = [
+ "displaydoc",
+ "zerovec",
+]
+
[[package]]
name = "tinytemplate"
version = "1.2.1"
@@ -3411,27 +3793,26 @@ checksum =
"1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
-version = "1.40.0"
+version = "1.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
+checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408"
dependencies = [
- "backtrace",
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
- "socket2",
+ "socket2 0.6.1",
"tokio-macros",
- "windows-sys 0.52.0",
+ "windows-sys 0.61.2",
]
[[package]]
name = "tokio-macros"
-version = "2.4.0"
+version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
+checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [
"proc-macro2",
"quote",
@@ -3597,6 +3978,45 @@ version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d"
+[[package]]
+name = "untrusted"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
+
+[[package]]
+name = "ureq"
+version = "3.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "99ba1025f18a4a3fc3e9b48c868e9beb4f24f4b4b1a325bada26bd4119f46537"
+dependencies = [
+ "base64",
+ "cookie_store",
+ "flate2",
+ "log",
+ "percent-encoding",
+ "rustls",
+ "rustls-pemfile",
+ "rustls-pki-types",
+ "serde",
+ "serde_json",
+ "ureq-proto",
+ "utf-8",
+ "webpki-roots",
+]
+
+[[package]]
+name = "ureq-proto"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "60b4531c118335662134346048ddb0e54cc86bd7e81866757873055f0e38f5d2"
+dependencies = [
+ "base64",
+ "http 1.3.1",
+ "httparse",
+ "log",
+]
+
[[package]]
name = "url"
version = "2.5.2"
@@ -3604,10 +4024,22 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c"
dependencies = [
"form_urlencoded",
- "idna",
+ "idna 0.5.0",
"percent-encoding",
]
+[[package]]
+name = "utf-8"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
+
+[[package]]
+name = "utf8_iter"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
+
[[package]]
name = "utf8parse"
version = "0.2.2"
@@ -3716,6 +4148,15 @@ dependencies = [
"wasm-bindgen",
]
+[[package]]
+name = "webpki-roots"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "32b130c0d2d49f8b6889abc456e795e82525204f27c42cf767cf0d7734e089b8"
+dependencies = [
+ "rustls-pki-types",
+]
+
[[package]]
name = "which"
version = "4.4.2"
@@ -3765,16 +4206,22 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
- "windows-targets",
+ "windows-targets 0.52.6",
]
+[[package]]
+name = "windows-link"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
+
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
- "windows-targets",
+ "windows-targets 0.52.6",
]
[[package]]
@@ -3783,7 +4230,25 @@ version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
- "windows-targets",
+ "windows-targets 0.52.6",
+]
+
+[[package]]
+name = "windows-sys"
+version = "0.60.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb"
+dependencies = [
+ "windows-targets 0.53.5",
+]
+
+[[package]]
+name = "windows-sys"
+version = "0.61.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc"
+dependencies = [
+ "windows-link",
]
[[package]]
@@ -3792,14 +4257,31 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
- "windows_aarch64_gnullvm",
- "windows_aarch64_msvc",
- "windows_i686_gnu",
- "windows_i686_gnullvm",
- "windows_i686_msvc",
- "windows_x86_64_gnu",
- "windows_x86_64_gnullvm",
- "windows_x86_64_msvc",
+ "windows_aarch64_gnullvm 0.52.6",
+ "windows_aarch64_msvc 0.52.6",
+ "windows_i686_gnu 0.52.6",
+ "windows_i686_gnullvm 0.52.6",
+ "windows_i686_msvc 0.52.6",
+ "windows_x86_64_gnu 0.52.6",
+ "windows_x86_64_gnullvm 0.52.6",
+ "windows_x86_64_msvc 0.52.6",
+]
+
+[[package]]
+name = "windows-targets"
+version = "0.53.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3"
+dependencies = [
+ "windows-link",
+ "windows_aarch64_gnullvm 0.53.1",
+ "windows_aarch64_msvc 0.53.1",
+ "windows_i686_gnu 0.53.1",
+ "windows_i686_gnullvm 0.53.1",
+ "windows_i686_msvc 0.53.1",
+ "windows_x86_64_gnu 0.53.1",
+ "windows_x86_64_gnullvm 0.53.1",
+ "windows_x86_64_msvc 0.53.1",
]
[[package]]
@@ -3808,48 +4290,96 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
+[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53"
+
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
+[[package]]
+name = "windows_aarch64_msvc"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006"
+
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
+[[package]]
+name = "windows_i686_gnu"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3"
+
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
+[[package]]
+name = "windows_i686_gnullvm"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c"
+
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
+[[package]]
+name = "windows_i686_msvc"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2"
+
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499"
+
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
+[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1"
+
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
+
[[package]]
name = "winnow"
version = "0.6.20"
@@ -3859,6 +4389,12 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "writeable"
+version = "0.6.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9"
+
[[package]]
name = "xz2"
version = "0.1.7"
@@ -3868,6 +4404,29 @@ dependencies = [
"lzma-sys",
]
+[[package]]
+name = "yoke"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954"
+dependencies = [
+ "stable_deref_trait",
+ "yoke-derive",
+ "zerofrom",
+]
+
+[[package]]
+name = "yoke-derive"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "synstructure",
+]
+
[[package]]
name = "zerocopy"
version = "0.7.35"
@@ -3889,6 +4448,66 @@ dependencies = [
"syn",
]
+[[package]]
+name = "zerofrom"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5"
+dependencies = [
+ "zerofrom-derive",
+]
+
+[[package]]
+name = "zerofrom-derive"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "synstructure",
+]
+
+[[package]]
+name = "zeroize"
+version = "1.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0"
+
+[[package]]
+name = "zerotrie"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851"
+dependencies = [
+ "displaydoc",
+ "yoke",
+ "zerofrom",
+]
+
+[[package]]
+name = "zerovec"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002"
+dependencies = [
+ "yoke",
+ "zerofrom",
+ "zerovec-derive",
+]
+
+[[package]]
+name = "zerovec-derive"
+version = "0.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
[[package]]
name = "zstd"
version = "0.13.2"
diff --git a/Cargo.toml b/Cargo.toml
index b4bab12b..2ced2a40 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -51,11 +51,13 @@ columnar_storage = { path = "src/columnar_storage" }
common = { path = "src/common" }
criterion = "0.5"
datafusion = "43"
-deadpool = "0.10"
futures = "0.3"
+hotpath = "0.5.2"
itertools = "0.3"
lazy_static = "1"
metric_engine = { path = "src/metric_engine" }
+num_cpus = "1"
+object-pool = "0.6"
object_store = { version = "0.11" }
once_cell = "1"
parquet = { version = "53" }
diff --git a/docs/assets/remote-write-concurrent-performance.png
b/docs/assets/remote-write-concurrent-performance.png
index 15d185a1..55b04fb8 100644
Binary files a/docs/assets/remote-write-concurrent-performance.png and
b/docs/assets/remote-write-concurrent-performance.png differ
diff --git a/docs/assets/remote-write-memory-performance.png
b/docs/assets/remote-write-memory-performance.png
index c5859820..5b113eed 100644
Binary files a/docs/assets/remote-write-memory-performance.png and
b/docs/assets/remote-write-memory-performance.png differ
diff --git a/docs/assets/remote-write-sequential-performance.png
b/docs/assets/remote-write-sequential-performance.png
index a401c285..c5f7942c 100644
Binary files a/docs/assets/remote-write-sequential-performance.png and
b/docs/assets/remote-write-sequential-performance.png differ
diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml
index de738d01..56cdfa32 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/benchmarks/Cargo.toml
@@ -29,18 +29,19 @@ description.workspace = true
name = "parser_mem"
path = "src/bin/parser_mem.rs"
-[[bin]]
-name = "pool_stats"
-path = "src/bin/pool_stats.rs"
-
[features]
+default = ["hotpath", "hotpath-alloc-bytes-total"]
+hotpath = []
+hotpath-alloc-bytes-total = ["hotpath", "hotpath/hotpath-alloc-bytes-total"]
unsafe-split = ["remote_write/unsafe-split"]
[dependencies]
+anyhow = { workspace = true }
bytes = { workspace = true }
columnar_storage = { workspace = true }
common = { workspace = true }
-deadpool = { workspace = true }
+hotpath = { workspace = true }
+num_cpus = { workspace = true }
pb_types = { workspace = true }
prost = { workspace = true }
protobuf = "3.7"
@@ -48,21 +49,18 @@ quick-protobuf = "0.8"
remote_write = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
-tikv-jemalloc-ctl = "0.5"
tokio = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
[target.'cfg(not(target_env = "msvc"))'.dependencies]
-tikv-jemallocator = "0.5"
[build-dependencies]
protobuf-codegen = "3.7"
[dev-dependencies]
criterion = { workspace = true }
-num_cpus = "1.16"
[[bench]]
name = "bench"
diff --git a/src/benchmarks/benches/bench.rs b/src/benchmarks/benches/bench.rs
index b932a441..13fe083b 100644
--- a/src/benchmarks/benches/bench.rs
+++ b/src/benchmarks/benches/bench.rs
@@ -64,12 +64,6 @@ fn bench_remote_write(c: &mut Criterion) {
let concurrent_scales = config.remote_write.concurrent_scales.clone();
let bench = RefCell::new(RemoteWriteBench::new(config.remote_write));
- let rt = tokio::runtime::Builder::new_multi_thread()
- .worker_threads(num_cpus::get())
- .enable_all()
- .build()
- .unwrap();
-
// Sequential parse bench.
let mut group = c.benchmark_group("remote_write_sequential");
@@ -85,10 +79,10 @@ fn bench_remote_write(c: &mut Criterion) {
group.bench_with_input(
BenchmarkId::new("pooled", n),
- &(&bench, &rt, n),
- |b, (bench, rt, scale)| {
+ &(&bench, n),
+ |b, (bench, scale)| {
let bench = bench.borrow();
- b.iter(||
rt.block_on(bench.pooled_parser_sequential(*scale)).unwrap())
+ b.iter(|| bench.pooled_parser_sequential(*scale).unwrap())
},
);
@@ -118,43 +112,37 @@ fn bench_remote_write(c: &mut Criterion) {
for &scale in &concurrent_scales {
group.bench_with_input(
BenchmarkId::new("prost", scale),
- &(&bench, &rt, scale),
- |b, (bench, rt, scale)| {
+ &(&bench, scale),
+ |b, (bench, scale)| {
let bench = bench.borrow();
- b.iter(||
rt.block_on(bench.prost_parser_concurrent(*scale)).unwrap())
+ b.iter(|| bench.prost_parser_concurrent(*scale).unwrap())
},
);
group.bench_with_input(
BenchmarkId::new("pooled", scale),
- &(&bench, &rt, scale),
- |b, (bench, rt, scale)| {
+ &(&bench, scale),
+ |b, (bench, scale)| {
let bench = bench.borrow();
- b.iter(||
rt.block_on(bench.pooled_parser_concurrent(*scale)).unwrap())
+ b.iter(|| bench.pooled_parser_concurrent(*scale).unwrap())
},
);
group.bench_with_input(
BenchmarkId::new("quick_protobuf", scale),
- &(&bench, &rt, scale),
- |b, (bench, rt, scale)| {
+ &(&bench, scale),
+ |b, (bench, scale)| {
let bench = bench.borrow();
- b.iter(|| {
- rt.block_on(bench.quick_protobuf_parser_concurrent(*scale))
- .unwrap()
- })
+ b.iter(||
bench.quick_protobuf_parser_concurrent(*scale).unwrap())
},
);
group.bench_with_input(
BenchmarkId::new("rust_protobuf", scale),
- &(&bench, &rt, scale),
- |b, (bench, rt, scale)| {
+ &(&bench, scale),
+ |b, (bench, scale)| {
let bench = bench.borrow();
- b.iter(|| {
- rt.block_on(bench.rust_protobuf_parser_concurrent(*scale))
- .unwrap()
- })
+ b.iter(||
bench.rust_protobuf_parser_concurrent(*scale).unwrap())
},
);
}
diff --git a/src/benchmarks/remote_write_memory_bench.py
b/src/benchmarks/remote_write_memory_bench.py
index 42a2e502..44f257c6 100644
--- a/src/benchmarks/remote_write_memory_bench.py
+++ b/src/benchmarks/remote_write_memory_bench.py
@@ -16,12 +16,9 @@
# specific language governing permissions and limitations
# under the License.
+import argparse
import subprocess
-import json
import sys
-import os
-from typing import Dict, Any
-import argparse
try:
from tabulate import tabulate
@@ -30,208 +27,95 @@ except ImportError:
print("Please install it with: pip3 install tabulate")
sys.exit(1)
-
-class MemoryBenchmark:
- def __init__(self, scale, mode, use_unsafe=False):
- self.project_path = "."
- self.data_path =
"../remote_write/tests/workloads/1709380533560664458.data"
- self.scale = scale
- self.mode = mode
- self.use_unsafe = use_unsafe
- self.parsers = [
- "pooled",
- "prost",
- "rust-protobuf",
- "quick-protobuf",
- ]
-
- def build_binary(self) -> bool:
- features_msg = " with unsafe-split" if self.use_unsafe else ""
- print(f"Building binary{features_msg}...")
- try:
- bin_name = "parser_mem"
- build_cmd = ["cargo", "build", "--release", "--bin", bin_name]
- if self.use_unsafe:
- build_cmd.extend(["--features", "unsafe-split"])
- result = subprocess.run(
- build_cmd,
- cwd=self.project_path,
- check=False,
- )
- if result.returncode != 0:
- print("Failed to build binary")
- return False
- return True
- except Exception as e:
- print(f"Failed to build binary: {e}")
- return False
-
- def run_parser(self, parser: str, mode: str, scale: int, bin_name: str) ->
Dict[str, Any]:
- binary_path = f"../../target/release/{bin_name}"
-
- cmd = [binary_path, mode, str(scale), parser]
-
- try:
- result = subprocess.run(
- cmd,
- cwd=self.project_path,
- capture_output=True,
- text=True,
- timeout=300, # 5 minute timeout
- )
-
- if result.returncode != 0:
- print(f"Error running {parser}: {result.stderr}")
- return {}
-
- return json.loads(result.stdout.strip())
-
- except subprocess.TimeoutExpired:
- print(f"Timeout running {parser} {mode} {scale}")
- return {}
- except json.JSONDecodeError as e:
- print(f"Failed to parse JSON from {parser}: {e}")
- print(f"Raw output: {result.stdout}")
- return {}
- except FileNotFoundError:
- print(f"Binary not found: {binary_path}")
- print(f"Please run: cargo build --release --bins")
- return {}
- except Exception as e:
- print(f"Exception running {parser}: {e}")
- return {}
-
- def run_benchmarks(self) -> Dict[str, Dict[str, Any]]:
- results = {}
- successful_count = 0
- total_count = len(self.parsers)
-
- print(f"\nRunning benchmarks for {total_count} parsers...")
-
- for i, parser in enumerate(self.parsers, 1):
- print(f"\n[{i}/{total_count}] Testing {parser}...")
- print(f"Running {self.mode} mode with scale {self.scale}...")
- result = self.run_parser(
- parser, self.mode, self.scale, "parser_mem")
-
- if result:
- result["parser"] = parser
- results[parser] = result
- successful_count += 1
- print(f"Success")
- else:
- print(f"Failed")
-
- print(
- f"\nCompleted: {successful_count}/{total_count} parsers succeeded")
-
- if successful_count == total_count:
- print("All parsers succeeded - generating report...")
- return results
- else:
- print("Some parsers failed - skipping report generation")
- return {}
-
- def analyze_results(self, results: Dict[str, Dict[str, Any]]):
- if not results:
- print("\nNo results to analyze - all parsers failed or were
skipped")
- return
-
- print(f"\n{'='*80}")
- print("MEMORY BENCHMARK RESULTS")
- print(f"{'='*80}")
- print(f"Mode: {self.mode.upper()}, Scale: {self.scale}")
- print()
-
- headers = [
- "Parser",
- "ThreadAlloc",
- "ThreadDealloc",
- "Allocated",
- "Active",
- "Metadata",
- "Mapped",
- "Resident",
- "Retained",
- ]
-
- table_data = []
- for parser in self.parsers:
- if parser in results:
- result = results[parser]
- memory = result.get("memory", {})
- row = [
- parser,
- f"{memory.get('thread_allocated_diff', 0):,}",
- f"{memory.get('thread_deallocated_diff', 0):,}",
- f"{memory.get('allocated', 0):,}",
- f"{memory.get('active', 0):,}",
- f"{memory.get('metadata', 0):,}",
- f"{memory.get('mapped', 0):,}",
- f"{memory.get('resident', 0):,}",
- f"{memory.get('retained', 0):,}",
- ]
- table_data.append(row)
-
- print("SUMMARY TABLE (All values in bytes)")
+PARSERS = ["pooled", "prost", "rust-protobuf", "quick-protobuf"]
+BINARY_PATH = "../../target/release/parser_mem"
+
+
+def build_binary(use_unsafe: bool):
+ print(f"Building binary{' with unsafe-split' if use_unsafe else ''}...")
+ cmd = ["cargo", "build", "--release", "--bin", "parser_mem"]
+ if use_unsafe:
+ cmd.extend(["--features", "unsafe-split"])
+ result = subprocess.run(cmd, cwd=".")
+ return result.returncode == 0
+
+
+def parse_total_alloc(stdout: str):
+ for line in stdout.splitlines():
+ if line.startswith("Total allocated bytes (cumulative):"):
+ try:
+ raw = line.split(":", 1)[1].strip().split(" ")[0]
+ return int(raw)
+ except Exception:
+ return None
+ return None
+
+
+def run_parser(parser: str, mode: str, scale: int):
+ cmd = [BINARY_PATH, mode, str(scale), parser]
+ try:
+ res = subprocess.run(
+ cmd, cwd=".", capture_output=True, text=True, timeout=300)
+ except Exception as e:
+ print(f"[error] run {parser}: {e}", file=sys.stderr)
+ return None
+ if res.returncode != 0:
+ print(f"[error] run {parser} failed: {res.stderr}", file=sys.stderr)
+ return None
+ total = parse_total_alloc(res.stdout)
+ if total is None:
print(
- tabulate(
- table_data,
- headers=headers,
- tablefmt="grid",
- stralign="right",
- numalign="right",
- )
+ f"[error] parse total bytes failed for {parser}.
Output:\n{res.stdout}",
+ file=sys.stderr,
+ )
+ return total
+
+
+def print_table(results: dict, mode: str, scale: int):
+ print(f"\n{'='*60}")
+ print("MEMORY BENCHMARK RESULTS")
+ print(f"{'='*60}")
+ print(f"Mode: {mode}, Scale: {scale}")
+ headers = ["Parser", "TotalAllocatedBytes"]
+ rows = [[p, f"{results[p]:,}"] for p in PARSERS if p in results]
+ print(
+ tabulate(
+ rows,
+ headers=headers,
+ tablefmt="grid",
+ stralign="right",
+ numalign="right",
)
-
-
-def main():
- parser = argparse.ArgumentParser(
- description="Memory benchmark for protobuf parsers"
- )
- parser.add_argument(
- "--unsafe", action="store_true", help="Enable unsafe-split feature"
- )
- parser.add_argument(
- "--mode",
- choices=["sequential", "concurrent"],
- default="sequential",
- help="Test mode to run (default: sequential)",
- )
- parser.add_argument(
- "--scale",
- type=int,
- default=10,
- help="Scale value for benchmark (default: 10)",
)
- args = parser.parse_args()
+def main():
+ ap = argparse.ArgumentParser(
+ description="Memory benchmark for protobuf parsers")
+ ap.add_argument("--unsafe", action="store_true",
+ help="Enable unsafe-split feature")
+ ap.add_argument(
+ "--mode", choices=["sequential", "concurrent"], default="sequential",
help="Run mode")
+ ap.add_argument("--scale", type=int, default=10, help="Benchmark scale")
+ args = ap.parse_args()
if args.scale <= 0:
- print(f"Invalid scale value '{args.scale}', scale must be positive")
- sys.exit(1)
-
- data_path = "../remote_write/tests/workloads/1709380533560664458.data"
- if not os.path.exists(data_path):
- print(f"Test data file not found at {data_path}")
- print("Please ensure test data exists before running benchmarks")
- sys.exit(1)
-
- benchmark = MemoryBenchmark(
- scale=args.scale, mode=args.mode, use_unsafe=args.unsafe
- )
-
- if not benchmark.build_binary():
- sys.exit(1)
-
- print(f"\nRunning memory benchmarks...")
- print(f"Mode: {args.mode}")
- print(f"Scale: {args.scale}")
- print(f"Unsafe optimization: {'enabled' if args.unsafe else 'disabled'}")
-
- results = benchmark.run_benchmarks()
- benchmark.analyze_results(results)
+ print("scale must be positive", file=sys.stderr)
+ return 1
+ if not build_binary(args.unsafe):
+ print("build failed", file=sys.stderr)
+ return 1
+ results = {}
+ failed = False
+ for parser in PARSERS:
+ total = run_parser(parser, args.mode, args.scale)
+ if total is None:
+ failed = True
+ else:
+ results[parser] = total
+ if results:
+ print_table(results, args.mode, args.scale)
+ return 1 if failed else 0
if __name__ == "__main__":
- main()
+ sys.exit(main())
diff --git a/src/benchmarks/src/bin/parser_mem.rs
b/src/benchmarks/src/bin/parser_mem.rs
index 0028cd57..a993ba6f 100644
--- a/src/benchmarks/src/bin/parser_mem.rs
+++ b/src/benchmarks/src/bin/parser_mem.rs
@@ -15,124 +15,118 @@
// specific language governing permissions and limitations
// under the License.
-use benchmarks::util::{MemoryBenchConfig, MemoryStats};
+use anyhow::Result;
+use benchmarks::util::{human_bytes, run_concurrent_threads, MemoryBenchConfig};
+use bytes::Bytes;
+use hotpath::{GuardBuilder, MetricType, MetricsProvider, Reporter};
use pb_types::WriteRequest as ProstWriteRequest;
use prost::Message;
use protobuf::Message as ProtobufMessage;
use quick_protobuf::{BytesReader, MessageRead};
use remote_write::pooled_parser::PooledParser;
-use tikv_jemallocator::Jemalloc;
-#[global_allocator]
-static ALLOC: Jemalloc = Jemalloc;
-
-#[tokio::main(flavor = "current_thread")]
-async fn main() -> Result<(), Box<dyn std::error::Error>> {
+fn main() -> Result<()> {
let config = MemoryBenchConfig::from_args();
let args: Vec<String> = std::env::args().collect();
let parser = args.get(3).map(|s| s.as_str()).unwrap_or("pooled");
- let start_stats = MemoryStats::collect()?;
+ let _guard = GuardBuilder::new("parser_mem")
+ .reporter(Box::new(TotalAllocReporter))
+ .build();
match config.mode.as_str() {
"sequential" => match parser {
- "pooled" => {
- let parser = PooledParser;
- for _ in 0..config.scale {
- let _ =
parser.decode_async(config.test_data.clone()).await?;
- }
- }
- "prost" => {
- for _ in 0..config.scale {
- ProstWriteRequest::decode(config.test_data.clone())?;
- }
- }
- "rust-protobuf" => {
- for _ in 0..config.scale {
- let _ =
benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(
- &config.test_data,
- )?;
- }
- }
- "quick-protobuf" => {
- for _ in 0..config.scale {
- let mut reader =
BytesReader::from_bytes(&config.test_data);
- let _ =
benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(
- &mut reader,
- &config.test_data,
- )?;
- }
- }
+ "pooled" => pooled_worker(config.test_data.clone(), config.scale),
+ "prost" => prost_worker(config.test_data.clone(), config.scale),
+ "rust-protobuf" => rust_protobuf_worker(config.test_data.clone(),
config.scale),
+ "quick-protobuf" =>
quick_protobuf_worker(config.test_data.clone(), config.scale),
other => panic!("unknown parser: {}", other),
},
"concurrent" => match parser {
- "pooled" => {
- let mut handles = Vec::new();
- for _ in 0..config.scale {
- let data_clone = config.test_data.clone();
- let handle = tokio::spawn(async move {
- let parser = PooledParser;
- let _ = parser.decode_async(data_clone).await;
- });
- handles.push(handle);
- }
- for handle in handles {
- handle.await?;
- }
- }
- "prost" => {
- let mut handles = Vec::new();
- for _ in 0..config.scale {
- let data_clone = config.test_data.clone();
- let handle = tokio::spawn(async move {
- let _ = ProstWriteRequest::decode(data_clone);
- });
- handles.push(handle);
- }
- for handle in handles {
- handle.await?;
- }
- }
- "rust-protobuf" => {
- let mut handles = Vec::new();
- for _ in 0..config.scale {
- let data_clone = config.test_data.clone();
- let handle = tokio::spawn(async move {
- let _ =
-
benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(
- &data_clone,
- );
- });
- handles.push(handle);
- }
- for handle in handles {
- handle.await?;
- }
- }
- "quick-protobuf" => {
- let mut handles = Vec::new();
- for _ in 0..config.scale {
- let data_clone = config.test_data.clone();
- let handle = tokio::spawn(async move {
- let mut reader = BytesReader::from_bytes(&data_clone);
- let _ =
benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(
- &mut reader,
- &data_clone,
- );
- });
- handles.push(handle);
- }
- for handle in handles {
- handle.await?;
- }
- }
+ "pooled" => run_concurrent_threads(config.scale, move |n| {
+ pooled_worker(config.test_data.clone(), n);
+ Ok(())
+ })
+ .map_err(anyhow::Error::msg)?,
+ "prost" => run_concurrent_threads(config.scale, move |n| {
+ prost_worker(config.test_data.clone(), n);
+ Ok(())
+ })
+ .map_err(anyhow::Error::msg)?,
+ "rust-protobuf" => run_concurrent_threads(config.scale, move |n| {
+ rust_protobuf_worker(config.test_data.clone(), n);
+ Ok(())
+ })
+ .map_err(anyhow::Error::msg)?,
+ "quick-protobuf" => run_concurrent_threads(config.scale, move |n| {
+ quick_protobuf_worker(config.test_data.clone(), n);
+ Ok(())
+ })
+ .map_err(anyhow::Error::msg)?,
other => panic!("unknown parser: {}", other),
},
_ => panic!("invalid mode"),
}
- let end_stats = MemoryStats::collect()?;
- let memory_diff = start_stats.diff(&end_stats);
- config.output_json(&memory_diff);
Ok(())
}
+
+#[cfg_attr(feature = "hotpath", hotpath::measure)]
+fn pooled_worker(data: Bytes, iterations: usize) {
+ let parser = PooledParser;
+ for _ in 0..iterations {
+ let _ = parser.decode(data.clone());
+ }
+}
+
+#[cfg_attr(feature = "hotpath", hotpath::measure)]
+fn prost_worker(data: Bytes, iterations: usize) {
+ for _ in 0..iterations {
+ let _ = ProstWriteRequest::decode(data.clone());
+ }
+}
+
+#[cfg_attr(feature = "hotpath", hotpath::measure)]
+fn rust_protobuf_worker(data: Bytes, iterations: usize) {
+ for _ in 0..iterations {
+ let _ =
benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(&data);
+ }
+}
+
+#[cfg_attr(feature = "hotpath", hotpath::measure)]
+fn quick_protobuf_worker(data: Bytes, iterations: usize) {
+ for _ in 0..iterations {
+ let mut reader = BytesReader::from_bytes(&data);
+ let _ =
+
benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(&mut reader,
&data);
+ }
+}
+
+struct TotalAllocReporter;
+
+impl Reporter for TotalAllocReporter {
+ fn report(&self, metrics: &dyn MetricsProvider<'_>) -> Result<(), Box<dyn
std::error::Error>> {
+ // In hotpath, each profiled function will output a row of
[metrics](https://github.com/pawurb/hotpath/blob/main/hotpath-alloc-report.png):
+ // [calls, avg, pXX, total, %total], we only care about the `total`
+ // metric to verify our zero-allocation optimization. Since each
function's
+ // metrics include those of its nested calls, we need to use the
+ // maximum value of the `total` field across all functions as
+ // the total memory allocated during decoding.
+ let mut max_total: u64 = 0;
+ for values in metrics.metric_data().values() {
+ if values.len() < 2 {
+ continue;
+ }
+ let total_idx = values.len() - 2;
+ if let MetricType::AllocBytes(bytes) = values[total_idx] {
+ max_total = std::cmp::max(max_total, bytes);
+ }
+ }
+ println!(
+ "Total allocated bytes (cumulative): {} ({})",
+ max_total,
+ human_bytes(max_total)
+ );
+ Ok(())
+ }
+}
diff --git a/src/benchmarks/src/bin/pool_stats.rs
b/src/benchmarks/src/bin/pool_stats.rs
deleted file mode 100644
index 48115075..00000000
--- a/src/benchmarks/src/bin/pool_stats.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! evaluate the efficiency of the deadpool-backed object pool.
-
-use std::fs;
-
-use bytes::Bytes;
-use remote_write::{pooled_parser::PooledParser, pooled_types::POOL};
-use tikv_jemallocator::Jemalloc;
-use tokio::task::JoinHandle;
-
-#[global_allocator]
-static ALLOC: Jemalloc = Jemalloc;
-
-async fn run_concurrent_parsing(scale: usize) -> deadpool::Status {
- let data =
fs::read("../remote_write/tests/workloads/1709380533560664458.data")
- .expect("test data load failed");
- let data = Bytes::from(data);
-
- let handles: Vec<JoinHandle<()>> = (0..scale)
- .map(|_| {
- let data = data.clone();
- tokio::spawn(async move {
- let parser = PooledParser;
- let _ = parser
- .decode_async(data.clone())
- .await
- .expect("parse failed");
- })
- })
- .collect();
-
- for handle in handles {
- handle.await.expect("task completion failed");
- }
-
- POOL.status()
-}
-
-#[tokio::main]
-async fn main() {
- let scale_values = [1, 2, 5, 10, 20, 50, 100, 200, 500];
-
- println!(
- "{:<8} {:<10} {:<10} {:<10} {:<10}",
- "Scale", "MaxSize", "PoolSize", "Available", "Waiting"
- );
- println!("{}", "=".repeat(50));
-
- for &scale in &scale_values {
- let status = run_concurrent_parsing(scale).await;
-
- println!(
- "{:<8} {:<10} {:<10} {:<10} {:<10}",
- scale, status.max_size, status.size, status.available,
status.waiting
- );
-
- tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
- }
-
- println!("=== Final Pool Status ===");
- let final_status = POOL.status();
- println!("Max Pool Size: {}", final_status.max_size);
- println!("Current Pool Size: {}", final_status.size);
- println!("Available Objects: {}", final_status.available);
- println!("Waiting Requests: {}", final_status.waiting);
-}
diff --git a/src/benchmarks/src/remote_write_bench.rs
b/src/benchmarks/src/remote_write_bench.rs
index b5372da5..c6f8ca65 100644
--- a/src/benchmarks/src/remote_write_bench.rs
+++ b/src/benchmarks/src/remote_write_bench.rs
@@ -17,7 +17,7 @@
//! remote write parser bench.
-use std::{fs, path::PathBuf};
+use std::{fs, path::PathBuf, sync::Arc};
use bytes::Bytes;
use pb_types::WriteRequest as ProstWriteRequest;
@@ -25,12 +25,12 @@ use prost::Message;
use protobuf::Message as ProtobufMessage;
use quick_protobuf::{BytesReader, MessageRead};
use remote_write::pooled_parser::PooledParser;
-use tokio::task::JoinHandle;
use crate::{
config::RemoteWriteConfig,
quick_protobuf_remote_write::WriteRequest as QuickProtobufWriteRequest,
rust_protobuf_remote_write::WriteRequest as RustProtobufWriteRequest,
+ util::run_concurrent_threads,
};
pub struct RemoteWriteBench {
@@ -59,14 +59,13 @@ impl RemoteWriteBench {
}
// Hand-written pooled parser sequential bench.
- pub async fn pooled_parser_sequential(&self, scale: usize) -> Result<(),
String> {
+ pub fn pooled_parser_sequential(&self, scale: usize) -> Result<(), String>
{
let parser = PooledParser;
for _ in 0..scale {
let data = Bytes::from(self.raw_data.clone());
let _ = parser
- .decode_async(data.clone())
- .await
- .map_err(|e| format!("pooled sequential parse failed: {:?}",
e))?;
+ .decode(data.clone())
+ .map_err(|e| format!("pooled sequential parse failed:
{e:?}"))?;
}
Ok(())
}
@@ -91,85 +90,55 @@ impl RemoteWriteBench {
}
// prost parser concurrent bench.
- pub async fn prost_parser_concurrent(&self, scale: usize) -> Result<(),
String> {
- let join_handles: Vec<JoinHandle<Result<(), String>>> = (0..scale)
- .map(|_| {
- let raw_data = self.raw_data.clone();
- tokio::spawn(async move {
- let data = Bytes::from(raw_data);
- ProstWriteRequest::decode(data)
- .map_err(|e| format!("prost concurrent parse failed:
{}", e))?;
- Ok(())
- })
- })
- .collect();
-
- for join_handle in join_handles {
- join_handle.await.unwrap()?;
- }
- Ok(())
+ pub fn prost_parser_concurrent(&self, scale: usize) -> Result<(), String> {
+ let raw = Arc::new(self.raw_data.clone());
+ run_concurrent_threads(scale, move |n| {
+ for _ in 0..n {
+ let data = Bytes::from((*raw).clone());
+ ProstWriteRequest::decode(data)
+ .map_err(|e| format!("prost concurrent parse failed: {}",
e))?;
+ }
+ Ok(())
+ })
}
// Hand-written pooled parser concurrent bench.
- pub async fn pooled_parser_concurrent(&self, scale: usize) -> Result<(),
String> {
- let parser = PooledParser;
- let join_handles: Vec<JoinHandle<Result<(), String>>> = (0..scale)
- .map(|_| {
- let parser = parser.clone();
- let raw_data = self.raw_data.clone();
- tokio::spawn(async move {
- let data = Bytes::from(raw_data);
- let _ = parser
- .decode_async(data.clone())
- .await
- .map_err(|e| format!("pooled concurrent parse failed:
{:?}", e))?;
- Ok(())
- })
- })
- .collect();
-
- for join_handle in join_handles {
- join_handle.await.unwrap()?;
- }
- Ok(())
+ pub fn pooled_parser_concurrent(&self, scale: usize) -> Result<(), String>
{
+ let raw = Arc::new(self.raw_data.clone());
+ run_concurrent_threads(scale, move |n| {
+ let parser = PooledParser;
+ for _ in 0..n {
+ let data = Bytes::from((*raw).clone());
+ let _ = parser
+ .decode(data.clone())
+ .map_err(|e| format!("pooled concurrent parse failed:
{e:?}"))?;
+ }
+ Ok(())
+ })
}
// quick-protobuf parser concurrent bench.
- pub async fn quick_protobuf_parser_concurrent(&self, scale: usize) ->
Result<(), String> {
- let join_handles: Vec<tokio::task::JoinHandle<Result<(), String>>> =
(0..scale)
- .map(|_| {
- let data = self.raw_data.clone();
- tokio::spawn(async move {
- let mut reader = BytesReader::from_bytes(&data);
- QuickProtobufWriteRequest::from_reader(&mut reader, &data)
- .map_err(|e| format!("quick-protobuf concurrent parse
failed: {}", e))?;
- Ok(())
- })
- })
- .collect();
-
- for join_handle in join_handles {
- join_handle.await.unwrap()?;
- }
- Ok(())
+ pub fn quick_protobuf_parser_concurrent(&self, scale: usize) -> Result<(),
String> {
+ let raw = Arc::new(self.raw_data.clone());
+ run_concurrent_threads(scale, move |n| {
+ for _ in 0..n {
+ let mut reader: BytesReader = BytesReader::from_bytes(&raw);
+ QuickProtobufWriteRequest::from_reader(&mut reader, &raw)
+ .map_err(|e| format!("quick-protobuf concurrent parse
failed: {}", e))?;
+ }
+ Ok(())
+ })
}
// rust-protobuf parser concurrent bench.
- pub async fn rust_protobuf_parser_concurrent(&self, scale: usize) ->
Result<(), String> {
- let join_handles: Vec<tokio::task::JoinHandle<Result<(), String>>> =
(0..scale)
- .map(|_| {
- let data = self.raw_data.clone();
- tokio::spawn(async move {
- RustProtobufWriteRequest::parse_from_bytes(&data)
- .map_err(|e| format!("rust-protobuf concurrent parse
failed: {}", e))?;
- Ok(())
- })
- })
- .collect();
-
- for join_handle in join_handles {
- join_handle.await.unwrap()?;
- }
- Ok(())
+ pub fn rust_protobuf_parser_concurrent(&self, scale: usize) -> Result<(),
String> {
+ let raw = Arc::new(self.raw_data.clone());
+ run_concurrent_threads(scale, move |n| {
+ for _ in 0..n {
+ RustProtobufWriteRequest::parse_from_bytes(&raw)
+ .map_err(|e| format!("rust-protobuf concurrent parse
failed: {}", e))?;
+ }
+ Ok(())
+ })
}
}
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index 4d74f785..92f32121 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -30,8 +30,6 @@ use serde::{
de::{self, Visitor},
Deserialize, Deserializer, Serialize, Serializer,
};
-use serde_json::json;
-use tikv_jemalloc_ctl::{epoch, stats, thread};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Default)]
pub struct ReadableDuration(pub Duration);
@@ -174,84 +172,6 @@ impl<'de> Deserialize<'de> for ReadableDuration {
}
}
-// Memory bench utilities.
-#[derive(Debug, Clone)]
-pub struct MemoryStats {
- pub thread_allocated: u64,
- pub thread_deallocated: u64,
- pub allocated: u64,
- pub active: u64,
- pub metadata: u64,
- pub mapped: u64,
- pub resident: u64,
- pub retained: u64,
-}
-
-#[derive(Debug, Clone)]
-pub struct MemoryStatsDiff {
- pub thread_allocated_diff: i64,
- pub thread_deallocated_diff: i64,
- pub allocated: i64,
- pub active: i64,
- pub metadata: i64,
- pub mapped: i64,
- pub resident: i64,
- pub retained: i64,
-}
-
-impl MemoryStats {
- pub fn collect() -> Result<Self, String> {
- epoch::advance().map_err(|e| format!("failed to advance jemalloc
epoch: {}", e))?;
-
- Ok(MemoryStats {
- thread_allocated: thread::allocatedp::read()
- .map_err(|e| format!("failed to read thread.allocatedp: {}",
e))?
- .get(),
- thread_deallocated: thread::deallocatedp::read()
- .map_err(|e| format!("failed to read thread.deallocatedp: {}",
e))?
- .get(),
- allocated: stats::allocated::read()
- .map_err(|e| format!("failed to read allocated: {}", e))?
- .try_into()
- .unwrap(),
- active: stats::active::read()
- .map_err(|e| format!("failed to read active: {}", e))?
- .try_into()
- .unwrap(),
- metadata: stats::metadata::read()
- .map_err(|e| format!("failed to read metadata: {}", e))?
- .try_into()
- .unwrap(),
- mapped: stats::mapped::read()
- .map_err(|e| format!("failed to read mapped: {}", e))?
- .try_into()
- .unwrap(),
- resident: stats::resident::read()
- .map_err(|e| format!("failed to read resident: {}", e))?
- .try_into()
- .unwrap(),
- retained: stats::retained::read()
- .map_err(|e| format!("failed to read retained: {}", e))?
- .try_into()
- .unwrap(),
- })
- }
-
- pub fn diff(&self, other: &MemoryStats) -> MemoryStatsDiff {
- MemoryStatsDiff {
- thread_allocated_diff: other.thread_allocated as i64 -
self.thread_allocated as i64,
- thread_deallocated_diff: other.thread_deallocated as i64
- - self.thread_deallocated as i64,
- allocated: other.allocated as i64 - self.allocated as i64,
- active: other.active as i64 - self.active as i64,
- metadata: other.metadata as i64 - self.metadata as i64,
- mapped: other.mapped as i64 - self.mapped as i64,
- resident: other.resident as i64 - self.resident as i64,
- retained: other.retained as i64 - self.retained as i64,
- }
- }
-}
-
pub struct MemoryBenchConfig {
pub test_data: Bytes,
pub scale: usize,
@@ -274,22 +194,50 @@ impl MemoryBenchConfig {
mode,
}
}
+}
- pub fn output_json(&self, memory_diff: &MemoryStatsDiff) {
- let result = json!({
- "mode": self.mode,
- "scale": self.scale,
- "memory": {
- "thread_allocated_diff": memory_diff.thread_allocated_diff,
- "thread_deallocated_diff": memory_diff.thread_deallocated_diff,
- "allocated": memory_diff.allocated,
- "active": memory_diff.active,
- "metadata": memory_diff.metadata,
- "mapped": memory_diff.mapped,
- "resident": memory_diff.resident,
- "retained": memory_diff.retained
- }
- });
- println!("{}", result);
+/// Run a workload concurrently with up to CPU core count threads.
+pub fn run_concurrent_threads<F>(scale: usize, worker: F) -> Result<(), String>
+where
+ F: Fn(usize) -> Result<(), String> + Send + Sync + 'static,
+{
+ let threads = std::cmp::min(scale, num_cpus::get());
+ if threads == 0 {
+ return Ok(());
+ }
+ let base = scale / threads;
+ let extra = scale % threads;
+ let worker = std::sync::Arc::new(worker);
+ let mut handles = Vec::with_capacity(threads);
+ for i in 0..threads {
+ let n = base + if i < extra { 1 } else { 0 };
+ if n == 0 {
+ continue;
+ }
+ let w = std::sync::Arc::clone(&worker);
+ handles.push(std::thread::spawn(move || (w)(n)));
+ }
+ for h in handles {
+ h.join().map_err(|_| "thread panicked".to_string())??;
+ }
+ Ok(())
+}
+
+/// Convert bytes number to a human-readable string.
+pub fn human_bytes(bytes: u64) -> String {
+ const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
+ if bytes == 0 {
+ return "0 B".to_string();
+ }
+ let mut size = bytes as f64;
+ let mut unit = 0usize;
+ while size >= 1024.0 && unit < UNITS.len() - 1 {
+ size /= 1024.0;
+ unit += 1;
+ }
+ if unit == 0 {
+ format!("{} {}", bytes, UNITS[unit])
+ } else {
+ format!("{:.1} {}", size, UNITS[unit])
}
}
diff --git a/src/remote_write/Cargo.toml b/src/remote_write/Cargo.toml
index 344308c4..e0295cad 100644
--- a/src/remote_write/Cargo.toml
+++ b/src/remote_write/Cargo.toml
@@ -30,10 +30,8 @@ unsafe-split = []
[dependencies]
anyhow = { workspace = true }
-async-trait = { workspace = true }
bytes = { workspace = true }
-deadpool = { workspace = true }
+object-pool = { workspace = true }
once_cell = { workspace = true }
pb_types = { workspace = true }
prost = { workspace = true }
-tokio = { workspace = true }
diff --git a/src/remote_write/README.md b/src/remote_write/README.md
index f41a00c5..173d6794 100644
--- a/src/remote_write/README.md
+++ b/src/remote_write/README.md
@@ -6,7 +6,7 @@ A hand-written [Prometheus Remote Write Request
(V1)](https://prometheus.io/docs
Key optimization techniques:
-- Object pooling backed by deadpool.
+- Object pooling backed by [object-pool](https://github.com/CJP10/object-pool).
- `RepeatedField` data structures.
@@ -29,6 +29,12 @@ pip3 install tabulate matplotlib
### CPU Time
+#### Benchmark Logic
+
+We benchmarked both sequential and concurrent parsing scenarios. In the
sequential case, we simply executed the parsing task `scale` times within a for
loop. In the concurrent case, we spawned a number of system-level threads equal
to the number of CPU cores and evenly distributed the `scale` parsing tasks
among them, with each thread performing sequential parsing in a for loop.
+
+**Note**: Since Go cannot directly create a system-level thread like
`std::thread::spawn` in Rust, we ignored the concurrent benchmark logic for Go.
+
#### Steps
Navigate to the benchmarks directory:
@@ -58,7 +64,7 @@ cd VictoriaMetrics/lib/prompb
vim prom_decode_bench_test.go
```
-and add the following code (please change the path of
1709380533560664458.data):
+and add the following code (please change the path of
`1709380533560664458.data`):
```go
package prompb
@@ -142,29 +148,6 @@ func benchDecoderSequential(decoder Decoder, data []byte,
n int) error {
return nil
}
-// Concurrent benchmark.
-func benchDecoderConcurrent(decoder Decoder, data []byte, workers int) error {
- results := make(chan error, workers)
-
- // Spawn workers (similar to tokio::spawn in Rust).
- for w := 0; w < workers; w++ {
- go func() {
- clonedDecoder := decoder.Clone()
- clonedDecoder.Reset()
- err := clonedDecoder.Parse(data)
- results <- err
- }()
- }
-
- // Wait for all workers to complete (similar to join_handle.await).
- for w := 0; w < workers; w++ {
- if err := <-results; err != nil {
- return err
- }
- }
- return nil
-}
-
func BenchmarkSequentialParse(b *testing.B) {
data, err := getTestDataPath()
if err != nil {
@@ -191,33 +174,6 @@ func BenchmarkSequentialParse(b *testing.B) {
}
}
}
-
-func BenchmarkConcurrentParse(b *testing.B) {
- data, err := getTestDataPath()
- if err != nil {
- b.Skipf("test data file not found: %v", err)
- }
-
- decoders := map[string]Decoder{
- "pooled": NewPooledDecoder(),
- "nopool": NewNoPoolDecoder(),
- }
-
- workers := []int{1, 5, 10, 20, 100}
-
- for decoderName, decoder := range decoders {
- for _, w := range workers {
- b.Run(fmt.Sprintf("%s/%d", decoderName, w), func(b *testing.B) {
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- if err := benchDecoderConcurrent(decoder, data, w); err !=
nil {
- b.Fatalf("failed to parse: %v", err)
- }
- }
- })
- }
- }
-}
```
Execute the Go benchmarks:
@@ -232,16 +188,18 @@ Test results are as follows:

-In sequential parsing scenarios, the hand-written pooled parsers (with and
without unsafe optimization) achieve the best performance across all scales
compared to other Rust parsers. The unsafe optimization provides nearly 50%
performance improvement.
-

-**Note**: Due to the nature of concurrent execution, concurrent parsing
benchmark results may vary (sometimes dramatically) across different runs.
However, we can still draw an overall conclusion.
-
-In concurrent parsing scenarios, from an overall perspective, the hand-written
pooled parsers (with and without unsafe optimization) still achieve the best
performance compared to other Rust parsers. The unsafe optimization continues
to provide performance improvements.
+In both scenarios, the hand-written pooled parsers (with and without unsafe
optimization) achieve the best performance across all scales compared to other
Rust parsers. The unsafe optimization provides nearly 50% performance
improvement.
### Memory Allocation
+#### Benchmark Logic
+
+We use [hotpath](https://github.com/pawurb/hotpath) to mesure the memory
allocation performance of different parsers.
+
+#### Steps
+
Navigate to the benchmarks directory:
```shell
@@ -251,40 +209,22 @@ cd src/benchmarks
Run memory allocation benchmarks:
```shell
-python3 remote_write_memory_bench.py --mode sequential --scale 10
+python3 remote_write_memory_bench.py --mode sequential --scale 64
```
Or enable unsafe optimization:
```shell
-python3 remote_write_memory_bench.py --mode concurrent --scale 10 --unsafe
+python3 remote_write_memory_bench.py --mode concurrent --scale 64 --unsafe
```
-**Note**: Sequential and concurrent mode results are similar due to the
enforced `#[tokio::main(flavor = "current_thread")]` configuration. This
constraint is necessary because Jemalloc's `thread::allocatedp` and
`thread::deallocatedp` statistics can only track single-threaded allocations
accurately.
-
-We focus on the
[allocatedp](https://docs.rs/tikv-jemalloc-ctl/0.6.0/tikv_jemalloc_ctl/thread/struct.allocatedp.html)
value to verify our zero-allocation parsing efforts, since it represents the
number of bytes that **have ever been** allocated by the thread.
+#### Results
The results are as follows:

-The hand-written pooled parser allocates minimal memory compared to other Rust
parsers. Note that the difference between `ThreadAlloc` and `ThreadDealloc` in
the pooled decoder is expected since we gather statistics right before the
program terminates and objects remain in the pool (not freed) at that time.
-
-### Object Pool Efficiency
-
-Navigate to the benchmarks directory:
-
-```shell
-cd src/benchmarks
-```
-
-Analyze pool utilization:
-
-```shell
-cargo run --bin pool_stats --release
-```
-
-Our testing finds that only 8 objects in the pool are sufficient to handle 500
concurrent parsing operations.
+The hand-written pooled parser allocates minimal memory allocation in total
compared to other Rust parsers, demonstrating the effectiveness of our
zero-allocation optimization.
## Acknowledgements
diff --git a/src/remote_write/src/pooled_parser.rs
b/src/remote_write/src/pooled_parser.rs
index 8b2c62fb..3b90b751 100644
--- a/src/remote_write/src/pooled_parser.rs
+++ b/src/remote_write/src/pooled_parser.rs
@@ -25,10 +25,11 @@
use anyhow::Result;
use bytes::Bytes;
+use object_pool::ReusableOwned;
use crate::{
pb_reader::read_write_request,
- pooled_types::{WriteRequest, WriteRequestManager, POOL},
+ pooled_types::{WriteRequest, POOL},
repeated_field::Clear,
};
@@ -40,26 +41,13 @@ impl PooledParser {
Self
}
- /// Decode a [`WriteRequest`] from the buffer and return it.
- pub fn decode(&self, buf: Bytes) -> Result<WriteRequest> {
- // Cannot get a WriteRequest instance from the pool in sync functions.
- let mut request = WriteRequest::default();
- read_write_request(buf, &mut request)?;
- Ok(request)
- }
-
- /// Decode a [`WriteRequest`] from the buffer and return a pooled object.
+ /// Decode a [`WriteRequest`] from the buffer.
///
/// This method will reuse a [`WriteRequest`] instance from the object
/// pool. After the returned object is dropped, it will be returned to the
- pub async fn decode_async(
- &self,
- buf: Bytes,
- ) -> Result<deadpool::managed::Object<WriteRequestManager>> {
- let mut pooled_request = POOL
- .get()
- .await
- .map_err(|e| anyhow::anyhow!("failed to get object from pool:
{e:?}"))?;
+ /// pool.
+ pub fn decode(&self, buf: Bytes) -> Result<ReusableOwned<WriteRequest>> {
+ let mut pooled_request = POOL.pull_owned(WriteRequest::default);
pooled_request.clear();
read_write_request(buf, &mut pooled_request)?;
Ok(pooled_request)
diff --git a/src/remote_write/src/pooled_types.rs
b/src/remote_write/src/pooled_types.rs
index 427eb607..b0ba6e2d 100644
--- a/src/remote_write/src/pooled_types.rs
+++ b/src/remote_write/src/pooled_types.rs
@@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use async_trait::async_trait;
+use std::sync::Arc;
+
use bytes::Bytes;
-use deadpool::managed::{Manager, Metrics, Pool, RecycleResult};
+use object_pool::Pool;
use once_cell::sync::Lazy;
use crate::repeated_field::{Clear, RepeatedField};
@@ -160,33 +161,8 @@ impl Clear for WriteRequest {
}
}
-/// A deadpool manager for PooledWriteRequest.
-pub struct WriteRequestManager;
-
-#[async_trait]
-impl Manager for WriteRequestManager {
- type Error = ();
- type Type = WriteRequest;
-
- async fn create(&self) -> Result<Self::Type, Self::Error> {
- Ok(WriteRequest::default())
- }
-
- async fn recycle(
- &self,
- _obj: &mut Self::Type,
- _metrics: &Metrics,
- ) -> RecycleResult<Self::Error> {
- // We will reset the object after acquiring it.
- Ok(())
- }
-}
-
-const POOL_SIZE: usize = 64; // Maximum number of objects in the pool.
+const POOL_SIZE: usize = 16; // Maximum number of objects in the pool.
-pub static POOL: Lazy<Pool<WriteRequestManager>> = Lazy::new(|| {
- Pool::builder(WriteRequestManager)
- .max_size(POOL_SIZE)
- .build()
- .unwrap()
-});
+/// Global thread-safe object pool for `WriteRequest`.
+pub static POOL: Lazy<Arc<Pool<WriteRequest>>> =
+ Lazy::new(|| Arc::new(Pool::new(POOL_SIZE, WriteRequest::default)));
diff --git a/src/remote_write/tests/equivalence_test.rs
b/src/remote_write/tests/equivalence_test.rs
index e545ecd4..96b2c7bd 100644
--- a/src/remote_write/tests/equivalence_test.rs
+++ b/src/remote_write/tests/equivalence_test.rs
@@ -21,13 +21,12 @@
//!
//! Test with `--features unsafe-split` to enable the unsafe optimization.
-use std::{fs, sync::Arc};
+use std::{fs, sync::Arc, thread};
use bytes::Bytes;
use pb_types::{Exemplar, Label, MetricMetadata, Sample, TimeSeries,
WriteRequest};
use prost::Message;
use remote_write::pooled_parser::PooledParser;
-use tokio::task::JoinHandle;
const ITERATIONS: usize = 50;
@@ -47,13 +46,10 @@ fn parse_with_prost(data: &Bytes) -> WriteRequest {
WriteRequest::decode(data.clone()).expect("prost decode failed")
}
-async fn parse_with_pooled(data: &Bytes) -> WriteRequest {
+fn parse_with_pooled(data: &Bytes) -> WriteRequest {
let data_copy = data.clone();
let parser = PooledParser;
- let pooled_request = parser
- .decode_async(data_copy)
- .await
- .expect("pooled decode failed");
+ let pooled_request = parser.decode(data_copy).expect("pooled decode
failed");
// Convert pooled types to pb_types to compare with prost.
let mut write_request = WriteRequest {
@@ -118,8 +114,8 @@ async fn parse_with_pooled(data: &Bytes) -> WriteRequest {
write_request
}
-#[tokio::test]
-async fn test_sequential_correctness() {
+#[test]
+fn test_sequential_correctness() {
let (data1, data2) = load_test_data();
let datasets = [&data1, &data2];
@@ -128,7 +124,7 @@ async fn test_sequential_correctness() {
let data = datasets[data_index];
let prost_result = parse_with_prost(data);
- let pooled_result = parse_with_pooled(data).await;
+ let pooled_result = parse_with_pooled(data);
assert_eq!(
&prost_result, &pooled_result,
@@ -138,19 +134,19 @@ async fn test_sequential_correctness() {
}
}
-#[tokio::test]
-async fn test_concurrent_correctness() {
+#[test]
+fn test_concurrent_correctness() {
let (data1, data2) = load_test_data();
let data1 = Arc::new(data1);
let data2 = Arc::new(data2);
- let mut handles: Vec<JoinHandle<()>> = Vec::new();
+ let mut handles = Vec::new();
for iteration in 0..ITERATIONS {
let data1_clone = Arc::clone(&data1);
let data2_clone = Arc::clone(&data2);
- let handle = tokio::spawn(async move {
+ let handle = thread::spawn(move || {
let data_index = iteration % 2;
let data = if data_index == 0 {
&*data1_clone
@@ -159,7 +155,7 @@ async fn test_concurrent_correctness() {
};
let prost_result = parse_with_prost(data);
- let pooled_result = parse_with_pooled(data).await;
+ let pooled_result = parse_with_pooled(data);
assert_eq!(
&prost_result, &pooled_result,
@@ -172,6 +168,6 @@ async fn test_concurrent_correctness() {
}
for handle in handles {
- handle.await.expect("task completion failed");
+ handle.join().expect("thread completion failed");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]