This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f9698cd refactor: remove async remote write decode logic (#1631)
8f9698cd is described below

commit 8f9698cdf83d02e12b93581464114d4e7cef483a
Author: Peiyang He <[email protected]>
AuthorDate: Thu Dec 11 15:50:23 2025 +0800

    refactor: remove async remote write decode logic (#1631)
    
    ## Rationale
    Deserialization is a CPU-intensive task, so introducing async does not
    provide significant benefits. In addition, using async causes function
    contagion issues. Therefore, it is necessary to remove the async decode
    logic.
    
    
    ## Detailed Changes
    - Remove `decode_async` interface
    - Change the back-end object pool dependency from `deadpool` to
    `object-pool`
    - Change benchmark logic accordingly
    
    
    ## Test Plan
    Manual test
---
 Cargo.lock                                         | 897 +++++++++++++++++----
 Cargo.toml                                         |   4 +-
 .../assets/remote-write-concurrent-performance.png | Bin 578747 -> 460173 bytes
 docs/assets/remote-write-memory-performance.png    | Bin 119032 -> 55817 bytes
 .../assets/remote-write-sequential-performance.png | Bin 620542 -> 607779 bytes
 src/benchmarks/Cargo.toml                          |  14 +-
 src/benchmarks/benches/bench.rs                    |  42 +-
 src/benchmarks/remote_write_memory_bench.py        | 284 ++-----
 src/benchmarks/src/bin/parser_mem.rs               | 190 +++--
 src/benchmarks/src/bin/pool_stats.rs               |  82 --
 src/benchmarks/src/remote_write_bench.rs           | 123 ++-
 src/benchmarks/src/util.rs                         | 140 +---
 src/remote_write/Cargo.toml                        |   4 +-
 src/remote_write/README.md                         |  98 +--
 src/remote_write/src/pooled_parser.rs              |  24 +-
 src/remote_write/src/pooled_types.rs               |  38 +-
 src/remote_write/tests/equivalence_test.rs         |  28 +-
 17 files changed, 1093 insertions(+), 875 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index a97698ea..031004da 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -40,7 +40,7 @@ dependencies = [
  "flate2",
  "futures-core",
  "h2",
- "http",
+ "http 0.2.12",
  "httparse",
  "httpdate",
  "itoa",
@@ -76,7 +76,7 @@ checksum = 
"13d324164c51f63867b57e73ba5936ea151b8a41a1d23d1031eeb9f70d0236f8"
 dependencies = [
  "bytestring",
  "cfg-if",
- "http",
+ "http 0.2.12",
  "regex",
  "regex-lite",
  "serde",
@@ -105,7 +105,7 @@ dependencies = [
  "futures-core",
  "futures-util",
  "mio",
- "socket2",
+ "socket2 0.5.7",
  "tokio",
  "tracing",
 ]
@@ -150,7 +150,7 @@ dependencies = [
  "bytes",
  "bytestring",
  "cfg-if",
- "cookie",
+ "cookie 0.16.2",
  "derive_more",
  "encoding_rs",
  "futures-core",
@@ -168,7 +168,7 @@ dependencies = [
  "serde_json",
  "serde_urlencoded",
  "smallvec",
- "socket2",
+ "socket2 0.5.7",
  "time",
  "url",
 ]
@@ -185,15 +185,6 @@ dependencies = [
  "syn",
 ]
 
-[[package]]
-name = "addr2line"
-version = "0.24.1"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375"
-dependencies = [
- "gimli",
-]
-
 [[package]]
 name = "adler2"
 version = "2.0.0"
@@ -320,6 +311,12 @@ version = "1.0.87"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "10f00e1f6e58a40e807377c75c6a7f97bf9044fab57816f2414e6f5f4499d7b8"
 
+[[package]]
+name = "arc-swap"
+version = "1.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
+
 [[package]]
 name = "arrayref"
 version = "0.3.8"
@@ -381,7 +378,7 @@ dependencies = [
  "chrono",
  "chrono-tz",
  "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "num",
 ]
 
@@ -604,21 +601,6 @@ version = "1.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
 
-[[package]]
-name = "backtrace"
-version = "0.3.74"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
-dependencies = [
- "addr2line",
- "cfg-if",
- "libc",
- "miniz_oxide",
- "object",
- "rustc-demangle",
- "windows-targets",
-]
-
 [[package]]
 name = "base64"
 version = "0.22.1"
@@ -629,11 +611,12 @@ checksum = 
"72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
 name = "benchmarks"
 version = "2.2.0-alpha"
 dependencies = [
+ "anyhow",
  "bytes",
  "columnar_storage",
  "common",
  "criterion",
- "deadpool",
+ "hotpath",
  "num_cpus",
  "pb_types",
  "prost",
@@ -643,8 +626,6 @@ dependencies = [
  "remote_write",
  "serde",
  "serde_json",
- "tikv-jemalloc-ctl",
- "tikv-jemallocator",
  "tokio",
  "toml",
  "tracing",
@@ -782,10 +763,11 @@ checksum = 
"37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
 
 [[package]]
 name = "cc"
-version = "1.1.18"
+version = "1.2.43"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476"
+checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2"
 dependencies = [
+ "find-msvc-tools",
  "jobserver",
  "libc",
  "shlex",
@@ -806,7 +788,7 @@ dependencies = [
  "android-tzdata",
  "iana-time-zone",
  "num-traits",
- "windows-targets",
+ "windows-targets 0.52.6",
 ]
 
 [[package]]
@@ -903,6 +885,15 @@ version = "1.0.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
 
+[[package]]
+name = "colored"
+version = "3.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e"
+dependencies = [
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "columnar_storage"
 version = "2.2.0-alpha"
@@ -993,6 +984,35 @@ dependencies = [
  "version_check",
 ]
 
+[[package]]
+name = "cookie"
+version = "0.18.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4ddef33a339a91ea89fb53151bd0a4689cfce27055c291dfa69945475d22c747"
+dependencies = [
+ "percent-encoding",
+ "time",
+ "version_check",
+]
+
+[[package]]
+name = "cookie_store"
+version = "0.22.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3fc4bff745c9b4c7fb1e97b25d13153da2bc7796260141df62378998d070207f"
+dependencies = [
+ "cookie 0.18.1",
+ "document-features",
+ "idna 1.1.0",
+ "indexmap",
+ "log",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "time",
+ "url",
+]
+
 [[package]]
 name = "core-foundation-sys"
 version = "0.8.7"
@@ -1053,6 +1073,15 @@ dependencies = [
  "itertools 0.10.5",
 ]
 
+[[package]]
+name = "crossbeam-channel"
+version = "0.5.15"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-deque"
 version = "0.8.6"
@@ -1123,7 +1152,7 @@ checksum = 
"5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
 dependencies = [
  "cfg-if",
  "crossbeam-utils",
- "hashbrown",
+ "hashbrown 0.14.5",
  "lock_api",
  "once_cell",
  "parking_lot_core",
@@ -1165,7 +1194,7 @@ dependencies = [
  "futures",
  "glob",
  "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "indexmap",
  "itertools 0.13.0",
  "log",
@@ -1214,7 +1243,7 @@ dependencies = [
  "arrow-schema",
  "chrono",
  "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "indexmap",
  "instant",
  "libc",
@@ -1248,7 +1277,7 @@ dependencies = [
  "datafusion-common",
  "datafusion-expr",
  "futures",
- "hashbrown",
+ "hashbrown 0.14.5",
  "log",
  "object_store",
  "parking_lot",
@@ -1308,7 +1337,7 @@ dependencies = [
  "datafusion-common",
  "datafusion-execution",
  "datafusion-expr",
- "hashbrown",
+ "hashbrown 0.14.5",
  "hex",
  "itertools 0.13.0",
  "log",
@@ -1415,7 +1444,7 @@ dependencies = [
  "datafusion-common",
  "datafusion-expr",
  "datafusion-physical-expr",
- "hashbrown",
+ "hashbrown 0.14.5",
  "indexmap",
  "itertools 0.13.0",
  "log",
@@ -1443,7 +1472,7 @@ dependencies = [
  "datafusion-functions-aggregate-common",
  "datafusion-physical-expr-common",
  "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "indexmap",
  "itertools 0.13.0",
  "log",
@@ -1461,7 +1490,7 @@ dependencies = [
  "arrow",
  "datafusion-common",
  "datafusion-expr-common",
- "hashbrown",
+ "hashbrown 0.14.5",
  "rand",
 ]
 
@@ -1505,7 +1534,7 @@ dependencies = [
  "datafusion-physical-expr-common",
  "futures",
  "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "indexmap",
  "itertools 0.13.0",
  "log",
@@ -1534,24 +1563,6 @@ dependencies = [
  "strum",
 ]
 
-[[package]]
-name = "deadpool"
-version = "0.10.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
-dependencies = [
- "async-trait",
- "deadpool-runtime",
- "num_cpus",
- "tokio",
-]
-
-[[package]]
-name = "deadpool-runtime"
-version = "0.1.4"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
-
 [[package]]
 name = "deranged"
 version = "0.3.11"
@@ -1585,12 +1596,59 @@ dependencies = [
  "subtle",
 ]
 
+[[package]]
+name = "dirs-next"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1"
+dependencies = [
+ "cfg-if",
+ "dirs-sys-next",
+]
+
+[[package]]
+name = "dirs-sys-next"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d"
+dependencies = [
+ "libc",
+ "redox_users",
+ "winapi",
+]
+
+[[package]]
+name = "displaydoc"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "document-features"
+version = "0.2.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61"
+dependencies = [
+ "litrs",
+]
+
 [[package]]
 name = "either"
 version = "1.13.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
 
+[[package]]
+name = "encode_unicode"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
+
 [[package]]
 name = "encoding_rs"
 version = "0.8.35"
@@ -1637,12 +1695,28 @@ dependencies = [
  "windows-sys 0.52.0",
 ]
 
+[[package]]
+name = "eyre"
+version = "0.6.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec"
+dependencies = [
+ "indenter",
+ "once_cell",
+]
+
 [[package]]
 name = "fastrand"
 version = "2.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
 
+[[package]]
+name = "find-msvc-tools"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127"
+
 [[package]]
 name = "fixedbitset"
 version = "0.4.2"
@@ -1794,12 +1868,6 @@ dependencies = [
  "wasi",
 ]
 
-[[package]]
-name = "gimli"
-version = "0.31.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64"
-
 [[package]]
 name = "glob"
 version = "0.3.1"
@@ -1817,7 +1885,7 @@ dependencies = [
  "futures-core",
  "futures-sink",
  "futures-util",
- "http",
+ "http 0.2.12",
  "indexmap",
  "slab",
  "tokio",
@@ -1846,6 +1914,22 @@ dependencies = [
  "allocator-api2",
 ]
 
+[[package]]
+name = "hashbrown"
+version = "0.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d"
+
+[[package]]
+name = "hdrhistogram"
+version = "7.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
+dependencies = [
+ "byteorder",
+ "num-traits",
+]
+
 [[package]]
 name = "heck"
 version = "0.5.0"
@@ -1879,6 +1963,39 @@ dependencies = [
  "windows-sys 0.59.0",
 ]
 
+[[package]]
+name = "hotpath"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6189994b73d164becf3187cc046e68b3f7e6c29e2ef61c30c9e75601fd148daf"
+dependencies = [
+ "arc-swap",
+ "cfg-if",
+ "clap",
+ "colored",
+ "crossbeam-channel",
+ "eyre",
+ "hdrhistogram",
+ "hotpath-macros",
+ "prettytable-rs",
+ "quanta",
+ "serde",
+ "serde_json",
+ "tokio",
+ "ureq",
+]
+
+[[package]]
+name = "hotpath-macros"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "15ec66a715a703f2e7b7b84f468b4f0187172ef998baa62fcce8d1174d9edb11"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "http"
 version = "0.2.12"
@@ -1890,6 +2007,17 @@ dependencies = [
  "itoa",
 ]
 
+[[package]]
+name = "http"
+version = "1.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
 [[package]]
 name = "httparse"
 version = "1.9.5"
@@ -1931,6 +2059,87 @@ dependencies = [
  "cc",
 ]
 
+[[package]]
+name = "icu_collections"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43"
+dependencies = [
+ "displaydoc",
+ "potential_utf",
+ "yoke",
+ "zerofrom",
+ "zerovec",
+]
+
+[[package]]
+name = "icu_locale_core"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6"
+dependencies = [
+ "displaydoc",
+ "litemap",
+ "tinystr",
+ "writeable",
+ "zerovec",
+]
+
+[[package]]
+name = "icu_normalizer"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599"
+dependencies = [
+ "icu_collections",
+ "icu_normalizer_data",
+ "icu_properties",
+ "icu_provider",
+ "smallvec",
+ "zerovec",
+]
+
+[[package]]
+name = "icu_normalizer_data"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a"
+
+[[package]]
+name = "icu_properties"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99"
+dependencies = [
+ "icu_collections",
+ "icu_locale_core",
+ "icu_properties_data",
+ "icu_provider",
+ "zerotrie",
+ "zerovec",
+]
+
+[[package]]
+name = "icu_properties_data"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899"
+
+[[package]]
+name = "icu_provider"
+version = "2.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614"
+dependencies = [
+ "displaydoc",
+ "icu_locale_core",
+ "writeable",
+ "yoke",
+ "zerofrom",
+ "zerotrie",
+ "zerovec",
+]
+
 [[package]]
 name = "idna"
 version = "0.5.0"
@@ -1941,20 +2150,47 @@ dependencies = [
  "unicode-normalization",
 ]
 
+[[package]]
+name = "idna"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de"
+dependencies = [
+ "idna_adapter",
+ "smallvec",
+ "utf8_iter",
+]
+
+[[package]]
+name = "idna_adapter"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344"
+dependencies = [
+ "icu_normalizer",
+ "icu_properties",
+]
+
 [[package]]
 name = "impl-more"
 version = "0.1.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "aae21c3177a27788957044151cc2800043d127acaa460a47ebb9b84dfa2c6aa0"
 
+[[package]]
+name = "indenter"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "964de6e86d545b246d84badc0fef527924ace5134f30641c203ef52ba83f58d5"
+
 [[package]]
 name = "indexmap"
-version = "2.5.0"
+version = "2.12.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5"
+checksum = "6717a8d2a5a929a1a2eb43a12812498ed141a0bcfb7e8f7844fbdbe4303bba9f"
 dependencies = [
  "equivalent",
- "hashbrown",
+ "hashbrown 0.16.0",
 ]
 
 [[package]]
@@ -2118,9 +2354,9 @@ dependencies = [
 
 [[package]]
 name = "libc"
-version = "0.2.158"
+version = "0.2.177"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
+checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976"
 
 [[package]]
 name = "libm"
@@ -2128,12 +2364,34 @@ version = "0.2.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
 
+[[package]]
+name = "libredox"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb"
+dependencies = [
+ "bitflags 2.6.0",
+ "libc",
+]
+
 [[package]]
 name = "linux-raw-sys"
 version = "0.4.14"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89"
 
+[[package]]
+name = "litemap"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77"
+
+[[package]]
+name = "litrs"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092"
+
 [[package]]
 name = "local-channel"
 version = "0.1.5"
@@ -2163,9 +2421,9 @@ dependencies = [
 
 [[package]]
 name = "log"
-version = "0.4.22"
+version = "0.4.28"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
+checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
 
 [[package]]
 name = "lz4_flex"
@@ -2371,12 +2629,12 @@ dependencies = [
 ]
 
 [[package]]
-name = "object"
-version = "0.36.4"
+name = "object-pool"
+version = "0.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a"
+checksum = "ceffa2e6ccecd71e60a0f06b655df2c66acd1c0c892dafefc96fd49d65f71d53"
 dependencies = [
- "memchr",
+ "parking_lot",
 ]
 
 [[package]]
@@ -2447,7 +2705,7 @@ dependencies = [
  "libc",
  "redox_syscall",
  "smallvec",
- "windows-targets",
+ "windows-targets 0.52.6",
 ]
 
 [[package]]
@@ -2471,7 +2729,7 @@ dependencies = [
  "flate2",
  "futures",
  "half",
- "hashbrown",
+ "hashbrown 0.14.5",
  "lz4_flex",
  "num",
  "num-bigint",
@@ -2629,6 +2887,15 @@ dependencies = [
  "plotters-backend",
 ]
 
+[[package]]
+name = "potential_utf"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77"
+dependencies = [
+ "zerovec",
+]
+
 [[package]]
 name = "powerfmt"
 version = "0.2.0"
@@ -2654,6 +2921,19 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "prettytable-rs"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "eea25e07510aa6ab6547308ebe3c036016d162b8da920dbb079e3ba8acf3d95a"
+dependencies = [
+ "encode_unicode",
+ "is-terminal",
+ "lazy_static",
+ "term",
+ "unicode-width",
+]
+
 [[package]]
 name = "proc-macro2"
 version = "1.0.86"
@@ -2767,6 +3047,21 @@ dependencies = [
  "thiserror",
 ]
 
+[[package]]
+name = "quanta"
+version = "0.12.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7"
+dependencies = [
+ "crossbeam-utils",
+ "libc",
+ "once_cell",
+ "raw-cpuid",
+ "wasi",
+ "web-sys",
+ "winapi",
+]
+
 [[package]]
 name = "quick-protobuf"
 version = "0.8.1"
@@ -2815,6 +3110,15 @@ dependencies = [
  "getrandom",
 ]
 
+[[package]]
+name = "raw-cpuid"
+version = "11.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186"
+dependencies = [
+ "bitflags 2.6.0",
+]
+
 [[package]]
 name = "rayon"
 version = "1.10.0"
@@ -2844,6 +3148,17 @@ dependencies = [
  "bitflags 2.6.0",
 ]
 
+[[package]]
+name = "redox_users"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43"
+dependencies = [
+ "getrandom",
+ "libredox",
+ "thiserror",
+]
+
 [[package]]
 name = "regex"
 version = "1.10.6"
@@ -2899,20 +3214,26 @@ name = "remote_write"
 version = "2.2.0-alpha"
 dependencies = [
  "anyhow",
- "async-trait",
  "bytes",
- "deadpool",
+ "object-pool",
  "once_cell",
  "pb_types",
  "prost",
- "tokio",
 ]
 
 [[package]]
-name = "rustc-demangle"
-version = "0.1.24"
+name = "ring"
+version = "0.17.14"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
+checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
+dependencies = [
+ "cc",
+ "cfg-if",
+ "getrandom",
+ "libc",
+ "untrusted",
+ "windows-sys 0.52.0",
+]
 
 [[package]]
 name = "rustc_version"
@@ -2936,6 +3257,50 @@ dependencies = [
  "windows-sys 0.52.0",
 ]
 
+[[package]]
+name = "rustls"
+version = "0.23.34"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7"
+dependencies = [
+ "log",
+ "once_cell",
+ "ring",
+ "rustls-pki-types",
+ "rustls-webpki",
+ "subtle",
+ "zeroize",
+]
+
+[[package]]
+name = "rustls-pemfile"
+version = "2.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50"
+dependencies = [
+ "rustls-pki-types",
+]
+
+[[package]]
+name = "rustls-pki-types"
+version = "1.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a"
+dependencies = [
+ "zeroize",
+]
+
+[[package]]
+name = "rustls-webpki"
+version = "0.103.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52"
+dependencies = [
+ "ring",
+ "rustls-pki-types",
+ "untrusted",
+]
+
 [[package]]
 name = "rustversion"
 version = "1.0.17"
@@ -3157,6 +3522,16 @@ dependencies = [
  "windows-sys 0.52.0",
 ]
 
+[[package]]
+name = "socket2"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881"
+dependencies = [
+ "libc",
+ "windows-sys 0.60.2",
+]
+
 [[package]]
 name = "sqlparser"
 version = "0.51.0"
@@ -3178,6 +3553,12 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "stable_deref_trait"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
+
 [[package]]
 name = "static_assertions"
 version = "1.1.0"
@@ -3229,6 +3610,17 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "synstructure"
+version = "0.13.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "temp-dir"
 version = "0.1.14"
@@ -3248,6 +3640,17 @@ dependencies = [
  "windows-sys 0.59.0",
 ]
 
+[[package]]
+name = "term"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f"
+dependencies = [
+ "dirs-next",
+ "rustversion",
+ "winapi",
+]
+
 [[package]]
 name = "test-log"
 version = "0.2.16"
@@ -3311,37 +3714,6 @@ dependencies = [
  "ordered-float",
 ]
 
-[[package]]
-name = "tikv-jemalloc-ctl"
-version = "0.5.4"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c"
-dependencies = [
- "libc",
- "paste",
- "tikv-jemalloc-sys",
-]
-
-[[package]]
-name = "tikv-jemalloc-sys"
-version = "0.5.4+5.3.0-patched"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1"
-dependencies = [
- "cc",
- "libc",
-]
-
-[[package]]
-name = "tikv-jemallocator"
-version = "0.5.4"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca"
-dependencies = [
- "libc",
- "tikv-jemalloc-sys",
-]
-
 [[package]]
 name = "time"
 version = "0.3.37"
@@ -3384,6 +3756,16 @@ dependencies = [
  "crunchy",
 ]
 
+[[package]]
+name = "tinystr"
+version = "0.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869"
+dependencies = [
+ "displaydoc",
+ "zerovec",
+]
+
 [[package]]
 name = "tinytemplate"
 version = "1.2.1"
@@ -3411,27 +3793,26 @@ checksum = 
"1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 [[package]]
 name = "tokio"
-version = "1.40.0"
+version = "1.48.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
+checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408"
 dependencies = [
- "backtrace",
  "bytes",
  "libc",
  "mio",
  "parking_lot",
  "pin-project-lite",
  "signal-hook-registry",
- "socket2",
+ "socket2 0.6.1",
  "tokio-macros",
- "windows-sys 0.52.0",
+ "windows-sys 0.61.2",
 ]
 
 [[package]]
 name = "tokio-macros"
-version = "2.4.0"
+version = "2.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
+checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -3597,6 +3978,45 @@ version = "0.1.13"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d"
 
+[[package]]
+name = "untrusted"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
+
+[[package]]
+name = "ureq"
+version = "3.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "99ba1025f18a4a3fc3e9b48c868e9beb4f24f4b4b1a325bada26bd4119f46537"
+dependencies = [
+ "base64",
+ "cookie_store",
+ "flate2",
+ "log",
+ "percent-encoding",
+ "rustls",
+ "rustls-pemfile",
+ "rustls-pki-types",
+ "serde",
+ "serde_json",
+ "ureq-proto",
+ "utf-8",
+ "webpki-roots",
+]
+
+[[package]]
+name = "ureq-proto"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "60b4531c118335662134346048ddb0e54cc86bd7e81866757873055f0e38f5d2"
+dependencies = [
+ "base64",
+ "http 1.3.1",
+ "httparse",
+ "log",
+]
+
 [[package]]
 name = "url"
 version = "2.5.2"
@@ -3604,10 +4024,22 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c"
 dependencies = [
  "form_urlencoded",
- "idna",
+ "idna 0.5.0",
  "percent-encoding",
 ]
 
+[[package]]
+name = "utf-8"
+version = "0.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
+
+[[package]]
+name = "utf8_iter"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
+
 [[package]]
 name = "utf8parse"
 version = "0.2.2"
@@ -3716,6 +4148,15 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "webpki-roots"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "32b130c0d2d49f8b6889abc456e795e82525204f27c42cf767cf0d7734e089b8"
+dependencies = [
+ "rustls-pki-types",
+]
+
 [[package]]
 name = "which"
 version = "4.4.2"
@@ -3765,16 +4206,22 @@ version = "0.52.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
 dependencies = [
- "windows-targets",
+ "windows-targets 0.52.6",
 ]
 
+[[package]]
+name = "windows-link"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
+
 [[package]]
 name = "windows-sys"
 version = "0.52.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
 dependencies = [
- "windows-targets",
+ "windows-targets 0.52.6",
 ]
 
 [[package]]
@@ -3783,7 +4230,25 @@ version = "0.59.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
 dependencies = [
- "windows-targets",
+ "windows-targets 0.52.6",
+]
+
+[[package]]
+name = "windows-sys"
+version = "0.60.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb"
+dependencies = [
+ "windows-targets 0.53.5",
+]
+
+[[package]]
+name = "windows-sys"
+version = "0.61.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc"
+dependencies = [
+ "windows-link",
 ]
 
 [[package]]
@@ -3792,14 +4257,31 @@ version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
 dependencies = [
- "windows_aarch64_gnullvm",
- "windows_aarch64_msvc",
- "windows_i686_gnu",
- "windows_i686_gnullvm",
- "windows_i686_msvc",
- "windows_x86_64_gnu",
- "windows_x86_64_gnullvm",
- "windows_x86_64_msvc",
+ "windows_aarch64_gnullvm 0.52.6",
+ "windows_aarch64_msvc 0.52.6",
+ "windows_i686_gnu 0.52.6",
+ "windows_i686_gnullvm 0.52.6",
+ "windows_i686_msvc 0.52.6",
+ "windows_x86_64_gnu 0.52.6",
+ "windows_x86_64_gnullvm 0.52.6",
+ "windows_x86_64_msvc 0.52.6",
+]
+
+[[package]]
+name = "windows-targets"
+version = "0.53.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3"
+dependencies = [
+ "windows-link",
+ "windows_aarch64_gnullvm 0.53.1",
+ "windows_aarch64_msvc 0.53.1",
+ "windows_i686_gnu 0.53.1",
+ "windows_i686_gnullvm 0.53.1",
+ "windows_i686_msvc 0.53.1",
+ "windows_x86_64_gnu 0.53.1",
+ "windows_x86_64_gnullvm 0.53.1",
+ "windows_x86_64_msvc 0.53.1",
 ]
 
 [[package]]
@@ -3808,48 +4290,96 @@ version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
 
+[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53"
+
 [[package]]
 name = "windows_aarch64_msvc"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
 
+[[package]]
+name = "windows_aarch64_msvc"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006"
+
 [[package]]
 name = "windows_i686_gnu"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
 
+[[package]]
+name = "windows_i686_gnu"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3"
+
 [[package]]
 name = "windows_i686_gnullvm"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
 
+[[package]]
+name = "windows_i686_gnullvm"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c"
+
 [[package]]
 name = "windows_i686_msvc"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
 
+[[package]]
+name = "windows_i686_msvc"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2"
+
 [[package]]
 name = "windows_x86_64_gnu"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
 
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499"
+
 [[package]]
 name = "windows_x86_64_gnullvm"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
 
+[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1"
+
 [[package]]
 name = "windows_x86_64_msvc"
 version = "0.52.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
 
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
+
 [[package]]
 name = "winnow"
 version = "0.6.20"
@@ -3859,6 +4389,12 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "writeable"
+version = "0.6.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9"
+
 [[package]]
 name = "xz2"
 version = "0.1.7"
@@ -3868,6 +4404,29 @@ dependencies = [
  "lzma-sys",
 ]
 
+[[package]]
+name = "yoke"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954"
+dependencies = [
+ "stable_deref_trait",
+ "yoke-derive",
+ "zerofrom",
+]
+
+[[package]]
+name = "yoke-derive"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "synstructure",
+]
+
 [[package]]
 name = "zerocopy"
 version = "0.7.35"
@@ -3889,6 +4448,66 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "zerofrom"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5"
+dependencies = [
+ "zerofrom-derive",
+]
+
+[[package]]
+name = "zerofrom-derive"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "synstructure",
+]
+
+[[package]]
+name = "zeroize"
+version = "1.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0"
+
+[[package]]
+name = "zerotrie"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851"
+dependencies = [
+ "displaydoc",
+ "yoke",
+ "zerofrom",
+]
+
+[[package]]
+name = "zerovec"
+version = "0.11.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002"
+dependencies = [
+ "yoke",
+ "zerofrom",
+ "zerovec-derive",
+]
+
+[[package]]
+name = "zerovec-derive"
+version = "0.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "zstd"
 version = "0.13.2"
diff --git a/Cargo.toml b/Cargo.toml
index b4bab12b..2ced2a40 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -51,11 +51,13 @@ columnar_storage = { path = "src/columnar_storage" }
 common = { path = "src/common" }
 criterion = "0.5"
 datafusion = "43"
-deadpool = "0.10"
 futures = "0.3"
+hotpath = "0.5.2"
 itertools = "0.3"
 lazy_static = "1"
 metric_engine = { path = "src/metric_engine" }
+num_cpus = "1"
+object-pool = "0.6"
 object_store = { version = "0.11" }
 once_cell = "1"
 parquet = { version = "53" }
diff --git a/docs/assets/remote-write-concurrent-performance.png 
b/docs/assets/remote-write-concurrent-performance.png
index 15d185a1..55b04fb8 100644
Binary files a/docs/assets/remote-write-concurrent-performance.png and 
b/docs/assets/remote-write-concurrent-performance.png differ
diff --git a/docs/assets/remote-write-memory-performance.png 
b/docs/assets/remote-write-memory-performance.png
index c5859820..5b113eed 100644
Binary files a/docs/assets/remote-write-memory-performance.png and 
b/docs/assets/remote-write-memory-performance.png differ
diff --git a/docs/assets/remote-write-sequential-performance.png 
b/docs/assets/remote-write-sequential-performance.png
index a401c285..c5f7942c 100644
Binary files a/docs/assets/remote-write-sequential-performance.png and 
b/docs/assets/remote-write-sequential-performance.png differ
diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml
index de738d01..56cdfa32 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/benchmarks/Cargo.toml
@@ -29,18 +29,19 @@ description.workspace = true
 name = "parser_mem"
 path = "src/bin/parser_mem.rs"
 
-[[bin]]
-name = "pool_stats"
-path = "src/bin/pool_stats.rs"
-
 [features]
+default = ["hotpath", "hotpath-alloc-bytes-total"]
+hotpath = []
+hotpath-alloc-bytes-total = ["hotpath", "hotpath/hotpath-alloc-bytes-total"]
 unsafe-split = ["remote_write/unsafe-split"]
 
 [dependencies]
+anyhow = { workspace = true }
 bytes = { workspace = true }
 columnar_storage = { workspace = true }
 common = { workspace = true }
-deadpool = { workspace = true }
+hotpath = { workspace = true }
+num_cpus = { workspace = true }
 pb_types = { workspace = true }
 prost = { workspace = true }
 protobuf = "3.7"
@@ -48,21 +49,18 @@ quick-protobuf = "0.8"
 remote_write = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
-tikv-jemalloc-ctl = "0.5"
 tokio = { workspace = true }
 toml = { workspace = true }
 tracing = { workspace = true }
 tracing-subscriber = { workspace = true }
 
 [target.'cfg(not(target_env = "msvc"))'.dependencies]
-tikv-jemallocator = "0.5"
 
 [build-dependencies]
 protobuf-codegen = "3.7"
 
 [dev-dependencies]
 criterion = { workspace = true }
-num_cpus = "1.16"
 
 [[bench]]
 name = "bench"
diff --git a/src/benchmarks/benches/bench.rs b/src/benchmarks/benches/bench.rs
index b932a441..13fe083b 100644
--- a/src/benchmarks/benches/bench.rs
+++ b/src/benchmarks/benches/bench.rs
@@ -64,12 +64,6 @@ fn bench_remote_write(c: &mut Criterion) {
     let concurrent_scales = config.remote_write.concurrent_scales.clone();
     let bench = RefCell::new(RemoteWriteBench::new(config.remote_write));
 
-    let rt = tokio::runtime::Builder::new_multi_thread()
-        .worker_threads(num_cpus::get())
-        .enable_all()
-        .build()
-        .unwrap();
-
     // Sequential parse bench.
     let mut group = c.benchmark_group("remote_write_sequential");
 
@@ -85,10 +79,10 @@ fn bench_remote_write(c: &mut Criterion) {
 
         group.bench_with_input(
             BenchmarkId::new("pooled", n),
-            &(&bench, &rt, n),
-            |b, (bench, rt, scale)| {
+            &(&bench, n),
+            |b, (bench, scale)| {
                 let bench = bench.borrow();
-                b.iter(|| 
rt.block_on(bench.pooled_parser_sequential(*scale)).unwrap())
+                b.iter(|| bench.pooled_parser_sequential(*scale).unwrap())
             },
         );
 
@@ -118,43 +112,37 @@ fn bench_remote_write(c: &mut Criterion) {
     for &scale in &concurrent_scales {
         group.bench_with_input(
             BenchmarkId::new("prost", scale),
-            &(&bench, &rt, scale),
-            |b, (bench, rt, scale)| {
+            &(&bench, scale),
+            |b, (bench, scale)| {
                 let bench = bench.borrow();
-                b.iter(|| 
rt.block_on(bench.prost_parser_concurrent(*scale)).unwrap())
+                b.iter(|| bench.prost_parser_concurrent(*scale).unwrap())
             },
         );
 
         group.bench_with_input(
             BenchmarkId::new("pooled", scale),
-            &(&bench, &rt, scale),
-            |b, (bench, rt, scale)| {
+            &(&bench, scale),
+            |b, (bench, scale)| {
                 let bench = bench.borrow();
-                b.iter(|| 
rt.block_on(bench.pooled_parser_concurrent(*scale)).unwrap())
+                b.iter(|| bench.pooled_parser_concurrent(*scale).unwrap())
             },
         );
 
         group.bench_with_input(
             BenchmarkId::new("quick_protobuf", scale),
-            &(&bench, &rt, scale),
-            |b, (bench, rt, scale)| {
+            &(&bench, scale),
+            |b, (bench, scale)| {
                 let bench = bench.borrow();
-                b.iter(|| {
-                    rt.block_on(bench.quick_protobuf_parser_concurrent(*scale))
-                        .unwrap()
-                })
+                b.iter(|| 
bench.quick_protobuf_parser_concurrent(*scale).unwrap())
             },
         );
 
         group.bench_with_input(
             BenchmarkId::new("rust_protobuf", scale),
-            &(&bench, &rt, scale),
-            |b, (bench, rt, scale)| {
+            &(&bench, scale),
+            |b, (bench, scale)| {
                 let bench = bench.borrow();
-                b.iter(|| {
-                    rt.block_on(bench.rust_protobuf_parser_concurrent(*scale))
-                        .unwrap()
-                })
+                b.iter(|| 
bench.rust_protobuf_parser_concurrent(*scale).unwrap())
             },
         );
     }
diff --git a/src/benchmarks/remote_write_memory_bench.py 
b/src/benchmarks/remote_write_memory_bench.py
index 42a2e502..44f257c6 100644
--- a/src/benchmarks/remote_write_memory_bench.py
+++ b/src/benchmarks/remote_write_memory_bench.py
@@ -16,12 +16,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import argparse
 import subprocess
-import json
 import sys
-import os
-from typing import Dict, Any
-import argparse
 
 try:
     from tabulate import tabulate
@@ -30,208 +27,95 @@ except ImportError:
     print("Please install it with: pip3 install tabulate")
     sys.exit(1)
 
-
-class MemoryBenchmark:
-    def __init__(self, scale, mode, use_unsafe=False):
-        self.project_path = "."
-        self.data_path = 
"../remote_write/tests/workloads/1709380533560664458.data"
-        self.scale = scale
-        self.mode = mode
-        self.use_unsafe = use_unsafe
-        self.parsers = [
-            "pooled",
-            "prost",
-            "rust-protobuf",
-            "quick-protobuf",
-        ]
-
-    def build_binary(self) -> bool:
-        features_msg = " with unsafe-split" if self.use_unsafe else ""
-        print(f"Building binary{features_msg}...")
-        try:
-            bin_name = "parser_mem"
-            build_cmd = ["cargo", "build", "--release", "--bin", bin_name]
-            if self.use_unsafe:
-                build_cmd.extend(["--features", "unsafe-split"])
-            result = subprocess.run(
-                build_cmd,
-                cwd=self.project_path,
-                check=False,
-            )
-            if result.returncode != 0:
-                print("Failed to build binary")
-                return False
-            return True
-        except Exception as e:
-            print(f"Failed to build binary: {e}")
-            return False
-
-    def run_parser(self, parser: str, mode: str, scale: int, bin_name: str) -> 
Dict[str, Any]:
-        binary_path = f"../../target/release/{bin_name}"
-
-        cmd = [binary_path, mode, str(scale), parser]
-
-        try:
-            result = subprocess.run(
-                cmd,
-                cwd=self.project_path,
-                capture_output=True,
-                text=True,
-                timeout=300,  # 5 minute timeout
-            )
-
-            if result.returncode != 0:
-                print(f"Error running {parser}: {result.stderr}")
-                return {}
-
-            return json.loads(result.stdout.strip())
-
-        except subprocess.TimeoutExpired:
-            print(f"Timeout running {parser} {mode} {scale}")
-            return {}
-        except json.JSONDecodeError as e:
-            print(f"Failed to parse JSON from {parser}: {e}")
-            print(f"Raw output: {result.stdout}")
-            return {}
-        except FileNotFoundError:
-            print(f"Binary not found: {binary_path}")
-            print(f"Please run: cargo build --release --bins")
-            return {}
-        except Exception as e:
-            print(f"Exception running {parser}: {e}")
-            return {}
-
-    def run_benchmarks(self) -> Dict[str, Dict[str, Any]]:
-        results = {}
-        successful_count = 0
-        total_count = len(self.parsers)
-
-        print(f"\nRunning benchmarks for {total_count} parsers...")
-
-        for i, parser in enumerate(self.parsers, 1):
-            print(f"\n[{i}/{total_count}] Testing {parser}...")
-            print(f"Running {self.mode} mode with scale {self.scale}...")
-            result = self.run_parser(
-                parser, self.mode, self.scale, "parser_mem")
-
-            if result:
-                result["parser"] = parser
-                results[parser] = result
-                successful_count += 1
-                print(f"Success")
-            else:
-                print(f"Failed")
-
-        print(
-            f"\nCompleted: {successful_count}/{total_count} parsers succeeded")
-
-        if successful_count == total_count:
-            print("All parsers succeeded - generating report...")
-            return results
-        else:
-            print("Some parsers failed - skipping report generation")
-            return {}
-
-    def analyze_results(self, results: Dict[str, Dict[str, Any]]):
-        if not results:
-            print("\nNo results to analyze - all parsers failed or were 
skipped")
-            return
-
-        print(f"\n{'='*80}")
-        print("MEMORY BENCHMARK RESULTS")
-        print(f"{'='*80}")
-        print(f"Mode: {self.mode.upper()}, Scale: {self.scale}")
-        print()
-
-        headers = [
-            "Parser",
-            "ThreadAlloc",
-            "ThreadDealloc",
-            "Allocated",
-            "Active",
-            "Metadata",
-            "Mapped",
-            "Resident",
-            "Retained",
-        ]
-
-        table_data = []
-        for parser in self.parsers:
-            if parser in results:
-                result = results[parser]
-                memory = result.get("memory", {})
-                row = [
-                    parser,
-                    f"{memory.get('thread_allocated_diff', 0):,}",
-                    f"{memory.get('thread_deallocated_diff', 0):,}",
-                    f"{memory.get('allocated', 0):,}",
-                    f"{memory.get('active', 0):,}",
-                    f"{memory.get('metadata', 0):,}",
-                    f"{memory.get('mapped', 0):,}",
-                    f"{memory.get('resident', 0):,}",
-                    f"{memory.get('retained', 0):,}",
-                ]
-                table_data.append(row)
-
-        print("SUMMARY TABLE (All values in bytes)")
+PARSERS = ["pooled", "prost", "rust-protobuf", "quick-protobuf"]
+BINARY_PATH = "../../target/release/parser_mem"
+
+
+def build_binary(use_unsafe: bool):
+    print(f"Building binary{' with unsafe-split' if use_unsafe else ''}...")
+    cmd = ["cargo", "build", "--release", "--bin", "parser_mem"]
+    if use_unsafe:
+        cmd.extend(["--features", "unsafe-split"])
+    result = subprocess.run(cmd, cwd=".")
+    return result.returncode == 0
+
+
+def parse_total_alloc(stdout: str):
+    for line in stdout.splitlines():
+        if line.startswith("Total allocated bytes (cumulative):"):
+            try:
+                raw = line.split(":", 1)[1].strip().split(" ")[0]
+                return int(raw)
+            except Exception:
+                return None
+    return None
+
+
+def run_parser(parser: str, mode: str, scale: int):
+    cmd = [BINARY_PATH, mode, str(scale), parser]
+    try:
+        res = subprocess.run(
+            cmd, cwd=".", capture_output=True, text=True, timeout=300)
+    except Exception as e:
+        print(f"[error] run {parser}: {e}", file=sys.stderr)
+        return None
+    if res.returncode != 0:
+        print(f"[error] run {parser} failed: {res.stderr}", file=sys.stderr)
+        return None
+    total = parse_total_alloc(res.stdout)
+    if total is None:
         print(
-            tabulate(
-                table_data,
-                headers=headers,
-                tablefmt="grid",
-                stralign="right",
-                numalign="right",
-            )
+            f"[error] parse total bytes failed for {parser}. 
Output:\n{res.stdout}",
+            file=sys.stderr,
+        )
+    return total
+
+
+def print_table(results: dict, mode: str, scale: int):
+    print(f"\n{'='*60}")
+    print("MEMORY BENCHMARK RESULTS")
+    print(f"{'='*60}")
+    print(f"Mode: {mode}, Scale: {scale}")
+    headers = ["Parser", "TotalAllocatedBytes"]
+    rows = [[p, f"{results[p]:,}"] for p in PARSERS if p in results]
+    print(
+        tabulate(
+            rows,
+            headers=headers,
+            tablefmt="grid",
+            stralign="right",
+            numalign="right",
         )
-
-
-def main():
-    parser = argparse.ArgumentParser(
-        description="Memory benchmark for protobuf parsers"
-    )
-    parser.add_argument(
-        "--unsafe", action="store_true", help="Enable unsafe-split feature"
-    )
-    parser.add_argument(
-        "--mode",
-        choices=["sequential", "concurrent"],
-        default="sequential",
-        help="Test mode to run (default: sequential)",
-    )
-    parser.add_argument(
-        "--scale",
-        type=int,
-        default=10,
-        help="Scale value for benchmark (default: 10)",
     )
 
-    args = parser.parse_args()
 
+def main():
+    ap = argparse.ArgumentParser(
+        description="Memory benchmark for protobuf parsers")
+    ap.add_argument("--unsafe", action="store_true",
+                    help="Enable unsafe-split feature")
+    ap.add_argument(
+        "--mode", choices=["sequential", "concurrent"], default="sequential", 
help="Run mode")
+    ap.add_argument("--scale", type=int, default=10, help="Benchmark scale")
+    args = ap.parse_args()
     if args.scale <= 0:
-        print(f"Invalid scale value '{args.scale}', scale must be positive")
-        sys.exit(1)
-
-    data_path = "../remote_write/tests/workloads/1709380533560664458.data"
-    if not os.path.exists(data_path):
-        print(f"Test data file not found at {data_path}")
-        print("Please ensure test data exists before running benchmarks")
-        sys.exit(1)
-
-    benchmark = MemoryBenchmark(
-        scale=args.scale, mode=args.mode, use_unsafe=args.unsafe
-    )
-
-    if not benchmark.build_binary():
-        sys.exit(1)
-
-    print(f"\nRunning memory benchmarks...")
-    print(f"Mode: {args.mode}")
-    print(f"Scale: {args.scale}")
-    print(f"Unsafe optimization: {'enabled' if args.unsafe else 'disabled'}")
-
-    results = benchmark.run_benchmarks()
-    benchmark.analyze_results(results)
+        print("scale must be positive", file=sys.stderr)
+        return 1
+    if not build_binary(args.unsafe):
+        print("build failed", file=sys.stderr)
+        return 1
+    results = {}
+    failed = False
+    for parser in PARSERS:
+        total = run_parser(parser, args.mode, args.scale)
+        if total is None:
+            failed = True
+        else:
+            results[parser] = total
+    if results:
+        print_table(results, args.mode, args.scale)
+    return 1 if failed else 0
 
 
 if __name__ == "__main__":
-    main()
+    sys.exit(main())
diff --git a/src/benchmarks/src/bin/parser_mem.rs 
b/src/benchmarks/src/bin/parser_mem.rs
index 0028cd57..a993ba6f 100644
--- a/src/benchmarks/src/bin/parser_mem.rs
+++ b/src/benchmarks/src/bin/parser_mem.rs
@@ -15,124 +15,118 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use benchmarks::util::{MemoryBenchConfig, MemoryStats};
+use anyhow::Result;
+use benchmarks::util::{human_bytes, run_concurrent_threads, MemoryBenchConfig};
+use bytes::Bytes;
+use hotpath::{GuardBuilder, MetricType, MetricsProvider, Reporter};
 use pb_types::WriteRequest as ProstWriteRequest;
 use prost::Message;
 use protobuf::Message as ProtobufMessage;
 use quick_protobuf::{BytesReader, MessageRead};
 use remote_write::pooled_parser::PooledParser;
-use tikv_jemallocator::Jemalloc;
 
-#[global_allocator]
-static ALLOC: Jemalloc = Jemalloc;
-
-#[tokio::main(flavor = "current_thread")]
-async fn main() -> Result<(), Box<dyn std::error::Error>> {
+fn main() -> Result<()> {
     let config = MemoryBenchConfig::from_args();
     let args: Vec<String> = std::env::args().collect();
     let parser = args.get(3).map(|s| s.as_str()).unwrap_or("pooled");
 
-    let start_stats = MemoryStats::collect()?;
+    let _guard = GuardBuilder::new("parser_mem")
+        .reporter(Box::new(TotalAllocReporter))
+        .build();
 
     match config.mode.as_str() {
         "sequential" => match parser {
-            "pooled" => {
-                let parser = PooledParser;
-                for _ in 0..config.scale {
-                    let _ = 
parser.decode_async(config.test_data.clone()).await?;
-                }
-            }
-            "prost" => {
-                for _ in 0..config.scale {
-                    ProstWriteRequest::decode(config.test_data.clone())?;
-                }
-            }
-            "rust-protobuf" => {
-                for _ in 0..config.scale {
-                    let _ = 
benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(
-                        &config.test_data,
-                    )?;
-                }
-            }
-            "quick-protobuf" => {
-                for _ in 0..config.scale {
-                    let mut reader = 
BytesReader::from_bytes(&config.test_data);
-                    let _ = 
benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(
-                        &mut reader,
-                        &config.test_data,
-                    )?;
-                }
-            }
+            "pooled" => pooled_worker(config.test_data.clone(), config.scale),
+            "prost" => prost_worker(config.test_data.clone(), config.scale),
+            "rust-protobuf" => rust_protobuf_worker(config.test_data.clone(), 
config.scale),
+            "quick-protobuf" => 
quick_protobuf_worker(config.test_data.clone(), config.scale),
             other => panic!("unknown parser: {}", other),
         },
         "concurrent" => match parser {
-            "pooled" => {
-                let mut handles = Vec::new();
-                for _ in 0..config.scale {
-                    let data_clone = config.test_data.clone();
-                    let handle = tokio::spawn(async move {
-                        let parser = PooledParser;
-                        let _ = parser.decode_async(data_clone).await;
-                    });
-                    handles.push(handle);
-                }
-                for handle in handles {
-                    handle.await?;
-                }
-            }
-            "prost" => {
-                let mut handles = Vec::new();
-                for _ in 0..config.scale {
-                    let data_clone = config.test_data.clone();
-                    let handle = tokio::spawn(async move {
-                        let _ = ProstWriteRequest::decode(data_clone);
-                    });
-                    handles.push(handle);
-                }
-                for handle in handles {
-                    handle.await?;
-                }
-            }
-            "rust-protobuf" => {
-                let mut handles = Vec::new();
-                for _ in 0..config.scale {
-                    let data_clone = config.test_data.clone();
-                    let handle = tokio::spawn(async move {
-                        let _ =
-                            
benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(
-                                &data_clone,
-                            );
-                    });
-                    handles.push(handle);
-                }
-                for handle in handles {
-                    handle.await?;
-                }
-            }
-            "quick-protobuf" => {
-                let mut handles = Vec::new();
-                for _ in 0..config.scale {
-                    let data_clone = config.test_data.clone();
-                    let handle = tokio::spawn(async move {
-                        let mut reader = BytesReader::from_bytes(&data_clone);
-                        let _ = 
benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(
-                            &mut reader,
-                            &data_clone,
-                        );
-                    });
-                    handles.push(handle);
-                }
-                for handle in handles {
-                    handle.await?;
-                }
-            }
+            "pooled" => run_concurrent_threads(config.scale, move |n| {
+                pooled_worker(config.test_data.clone(), n);
+                Ok(())
+            })
+            .map_err(anyhow::Error::msg)?,
+            "prost" => run_concurrent_threads(config.scale, move |n| {
+                prost_worker(config.test_data.clone(), n);
+                Ok(())
+            })
+            .map_err(anyhow::Error::msg)?,
+            "rust-protobuf" => run_concurrent_threads(config.scale, move |n| {
+                rust_protobuf_worker(config.test_data.clone(), n);
+                Ok(())
+            })
+            .map_err(anyhow::Error::msg)?,
+            "quick-protobuf" => run_concurrent_threads(config.scale, move |n| {
+                quick_protobuf_worker(config.test_data.clone(), n);
+                Ok(())
+            })
+            .map_err(anyhow::Error::msg)?,
             other => panic!("unknown parser: {}", other),
         },
         _ => panic!("invalid mode"),
     }
 
-    let end_stats = MemoryStats::collect()?;
-    let memory_diff = start_stats.diff(&end_stats);
-    config.output_json(&memory_diff);
     Ok(())
 }
+
+#[cfg_attr(feature = "hotpath", hotpath::measure)]
+fn pooled_worker(data: Bytes, iterations: usize) {
+    let parser = PooledParser;
+    for _ in 0..iterations {
+        let _ = parser.decode(data.clone());
+    }
+}
+
+#[cfg_attr(feature = "hotpath", hotpath::measure)]
+fn prost_worker(data: Bytes, iterations: usize) {
+    for _ in 0..iterations {
+        let _ = ProstWriteRequest::decode(data.clone());
+    }
+}
+
+#[cfg_attr(feature = "hotpath", hotpath::measure)]
+fn rust_protobuf_worker(data: Bytes, iterations: usize) {
+    for _ in 0..iterations {
+        let _ = 
benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(&data);
+    }
+}
+
+#[cfg_attr(feature = "hotpath", hotpath::measure)]
+fn quick_protobuf_worker(data: Bytes, iterations: usize) {
+    for _ in 0..iterations {
+        let mut reader = BytesReader::from_bytes(&data);
+        let _ =
+            
benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(&mut reader, 
&data);
+    }
+}
+
+struct TotalAllocReporter;
+
+impl Reporter for TotalAllocReporter {
+    fn report(&self, metrics: &dyn MetricsProvider<'_>) -> Result<(), Box<dyn 
std::error::Error>> {
+        // In hotpath, each profiled function will output a row of 
[metrics](https://github.com/pawurb/hotpath/blob/main/hotpath-alloc-report.png):
+        // [calls, avg, pXX, total, %total], we only care about the `total`
+        // metric to verify our zero-allocation optimization. Since each 
function's
+        // metrics include those of its nested calls, we need to use the
+        // maximum value of the `total` field across all functions as
+        // the total memory allocated during decoding.
+        let mut max_total: u64 = 0;
+        for values in metrics.metric_data().values() {
+            if values.len() < 2 {
+                continue;
+            }
+            let total_idx = values.len() - 2;
+            if let MetricType::AllocBytes(bytes) = values[total_idx] {
+                max_total = std::cmp::max(max_total, bytes);
+            }
+        }
+        println!(
+            "Total allocated bytes (cumulative): {} ({})",
+            max_total,
+            human_bytes(max_total)
+        );
+        Ok(())
+    }
+}
diff --git a/src/benchmarks/src/bin/pool_stats.rs 
b/src/benchmarks/src/bin/pool_stats.rs
deleted file mode 100644
index 48115075..00000000
--- a/src/benchmarks/src/bin/pool_stats.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! evaluate the efficiency of the deadpool-backed object pool.
-
-use std::fs;
-
-use bytes::Bytes;
-use remote_write::{pooled_parser::PooledParser, pooled_types::POOL};
-use tikv_jemallocator::Jemalloc;
-use tokio::task::JoinHandle;
-
-#[global_allocator]
-static ALLOC: Jemalloc = Jemalloc;
-
-async fn run_concurrent_parsing(scale: usize) -> deadpool::Status {
-    let data = 
fs::read("../remote_write/tests/workloads/1709380533560664458.data")
-        .expect("test data load failed");
-    let data = Bytes::from(data);
-
-    let handles: Vec<JoinHandle<()>> = (0..scale)
-        .map(|_| {
-            let data = data.clone();
-            tokio::spawn(async move {
-                let parser = PooledParser;
-                let _ = parser
-                    .decode_async(data.clone())
-                    .await
-                    .expect("parse failed");
-            })
-        })
-        .collect();
-
-    for handle in handles {
-        handle.await.expect("task completion failed");
-    }
-
-    POOL.status()
-}
-
-#[tokio::main]
-async fn main() {
-    let scale_values = [1, 2, 5, 10, 20, 50, 100, 200, 500];
-
-    println!(
-        "{:<8} {:<10} {:<10} {:<10} {:<10}",
-        "Scale", "MaxSize", "PoolSize", "Available", "Waiting"
-    );
-    println!("{}", "=".repeat(50));
-
-    for &scale in &scale_values {
-        let status = run_concurrent_parsing(scale).await;
-
-        println!(
-            "{:<8} {:<10} {:<10} {:<10} {:<10}",
-            scale, status.max_size, status.size, status.available, 
status.waiting
-        );
-
-        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
-    }
-
-    println!("=== Final Pool Status ===");
-    let final_status = POOL.status();
-    println!("Max Pool Size: {}", final_status.max_size);
-    println!("Current Pool Size: {}", final_status.size);
-    println!("Available Objects: {}", final_status.available);
-    println!("Waiting Requests: {}", final_status.waiting);
-}
diff --git a/src/benchmarks/src/remote_write_bench.rs 
b/src/benchmarks/src/remote_write_bench.rs
index b5372da5..c6f8ca65 100644
--- a/src/benchmarks/src/remote_write_bench.rs
+++ b/src/benchmarks/src/remote_write_bench.rs
@@ -17,7 +17,7 @@
 
 //! remote write parser bench.
 
-use std::{fs, path::PathBuf};
+use std::{fs, path::PathBuf, sync::Arc};
 
 use bytes::Bytes;
 use pb_types::WriteRequest as ProstWriteRequest;
@@ -25,12 +25,12 @@ use prost::Message;
 use protobuf::Message as ProtobufMessage;
 use quick_protobuf::{BytesReader, MessageRead};
 use remote_write::pooled_parser::PooledParser;
-use tokio::task::JoinHandle;
 
 use crate::{
     config::RemoteWriteConfig,
     quick_protobuf_remote_write::WriteRequest as QuickProtobufWriteRequest,
     rust_protobuf_remote_write::WriteRequest as RustProtobufWriteRequest,
+    util::run_concurrent_threads,
 };
 
 pub struct RemoteWriteBench {
@@ -59,14 +59,13 @@ impl RemoteWriteBench {
     }
 
     // Hand-written pooled parser sequential bench.
-    pub async fn pooled_parser_sequential(&self, scale: usize) -> Result<(), 
String> {
+    pub fn pooled_parser_sequential(&self, scale: usize) -> Result<(), String> 
{
         let parser = PooledParser;
         for _ in 0..scale {
             let data = Bytes::from(self.raw_data.clone());
             let _ = parser
-                .decode_async(data.clone())
-                .await
-                .map_err(|e| format!("pooled sequential parse failed: {:?}", 
e))?;
+                .decode(data.clone())
+                .map_err(|e| format!("pooled sequential parse failed: 
{e:?}"))?;
         }
         Ok(())
     }
@@ -91,85 +90,55 @@ impl RemoteWriteBench {
     }
 
     // prost parser concurrent bench.
-    pub async fn prost_parser_concurrent(&self, scale: usize) -> Result<(), 
String> {
-        let join_handles: Vec<JoinHandle<Result<(), String>>> = (0..scale)
-            .map(|_| {
-                let raw_data = self.raw_data.clone();
-                tokio::spawn(async move {
-                    let data = Bytes::from(raw_data);
-                    ProstWriteRequest::decode(data)
-                        .map_err(|e| format!("prost concurrent parse failed: 
{}", e))?;
-                    Ok(())
-                })
-            })
-            .collect();
-
-        for join_handle in join_handles {
-            join_handle.await.unwrap()?;
-        }
-        Ok(())
+    pub fn prost_parser_concurrent(&self, scale: usize) -> Result<(), String> {
+        let raw = Arc::new(self.raw_data.clone());
+        run_concurrent_threads(scale, move |n| {
+            for _ in 0..n {
+                let data = Bytes::from((*raw).clone());
+                ProstWriteRequest::decode(data)
+                    .map_err(|e| format!("prost concurrent parse failed: {}", 
e))?;
+            }
+            Ok(())
+        })
     }
 
     // Hand-written pooled parser concurrent bench.
-    pub async fn pooled_parser_concurrent(&self, scale: usize) -> Result<(), 
String> {
-        let parser = PooledParser;
-        let join_handles: Vec<JoinHandle<Result<(), String>>> = (0..scale)
-            .map(|_| {
-                let parser = parser.clone();
-                let raw_data = self.raw_data.clone();
-                tokio::spawn(async move {
-                    let data = Bytes::from(raw_data);
-                    let _ = parser
-                        .decode_async(data.clone())
-                        .await
-                        .map_err(|e| format!("pooled concurrent parse failed: 
{:?}", e))?;
-                    Ok(())
-                })
-            })
-            .collect();
-
-        for join_handle in join_handles {
-            join_handle.await.unwrap()?;
-        }
-        Ok(())
+    pub fn pooled_parser_concurrent(&self, scale: usize) -> Result<(), String> 
{
+        let raw = Arc::new(self.raw_data.clone());
+        run_concurrent_threads(scale, move |n| {
+            let parser = PooledParser;
+            for _ in 0..n {
+                let data = Bytes::from((*raw).clone());
+                let _ = parser
+                    .decode(data.clone())
+                    .map_err(|e| format!("pooled concurrent parse failed: 
{e:?}"))?;
+            }
+            Ok(())
+        })
     }
 
     // quick-protobuf parser concurrent bench.
-    pub async fn quick_protobuf_parser_concurrent(&self, scale: usize) -> 
Result<(), String> {
-        let join_handles: Vec<tokio::task::JoinHandle<Result<(), String>>> = 
(0..scale)
-            .map(|_| {
-                let data = self.raw_data.clone();
-                tokio::spawn(async move {
-                    let mut reader = BytesReader::from_bytes(&data);
-                    QuickProtobufWriteRequest::from_reader(&mut reader, &data)
-                        .map_err(|e| format!("quick-protobuf concurrent parse 
failed: {}", e))?;
-                    Ok(())
-                })
-            })
-            .collect();
-
-        for join_handle in join_handles {
-            join_handle.await.unwrap()?;
-        }
-        Ok(())
+    pub fn quick_protobuf_parser_concurrent(&self, scale: usize) -> Result<(), 
String> {
+        let raw = Arc::new(self.raw_data.clone());
+        run_concurrent_threads(scale, move |n| {
+            for _ in 0..n {
+                let mut reader: BytesReader = BytesReader::from_bytes(&raw);
+                QuickProtobufWriteRequest::from_reader(&mut reader, &raw)
+                    .map_err(|e| format!("quick-protobuf concurrent parse 
failed: {}", e))?;
+            }
+            Ok(())
+        })
     }
 
     // rust-protobuf parser concurrent bench.
-    pub async fn rust_protobuf_parser_concurrent(&self, scale: usize) -> 
Result<(), String> {
-        let join_handles: Vec<tokio::task::JoinHandle<Result<(), String>>> = 
(0..scale)
-            .map(|_| {
-                let data = self.raw_data.clone();
-                tokio::spawn(async move {
-                    RustProtobufWriteRequest::parse_from_bytes(&data)
-                        .map_err(|e| format!("rust-protobuf concurrent parse 
failed: {}", e))?;
-                    Ok(())
-                })
-            })
-            .collect();
-
-        for join_handle in join_handles {
-            join_handle.await.unwrap()?;
-        }
-        Ok(())
+    pub fn rust_protobuf_parser_concurrent(&self, scale: usize) -> Result<(), 
String> {
+        let raw = Arc::new(self.raw_data.clone());
+        run_concurrent_threads(scale, move |n| {
+            for _ in 0..n {
+                RustProtobufWriteRequest::parse_from_bytes(&raw)
+                    .map_err(|e| format!("rust-protobuf concurrent parse 
failed: {}", e))?;
+            }
+            Ok(())
+        })
     }
 }
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index 4d74f785..92f32121 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -30,8 +30,6 @@ use serde::{
     de::{self, Visitor},
     Deserialize, Deserializer, Serialize, Serializer,
 };
-use serde_json::json;
-use tikv_jemalloc_ctl::{epoch, stats, thread};
 
 #[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Default)]
 pub struct ReadableDuration(pub Duration);
@@ -174,84 +172,6 @@ impl<'de> Deserialize<'de> for ReadableDuration {
     }
 }
 
-// Memory bench utilities.
-#[derive(Debug, Clone)]
-pub struct MemoryStats {
-    pub thread_allocated: u64,
-    pub thread_deallocated: u64,
-    pub allocated: u64,
-    pub active: u64,
-    pub metadata: u64,
-    pub mapped: u64,
-    pub resident: u64,
-    pub retained: u64,
-}
-
-#[derive(Debug, Clone)]
-pub struct MemoryStatsDiff {
-    pub thread_allocated_diff: i64,
-    pub thread_deallocated_diff: i64,
-    pub allocated: i64,
-    pub active: i64,
-    pub metadata: i64,
-    pub mapped: i64,
-    pub resident: i64,
-    pub retained: i64,
-}
-
-impl MemoryStats {
-    pub fn collect() -> Result<Self, String> {
-        epoch::advance().map_err(|e| format!("failed to advance jemalloc 
epoch: {}", e))?;
-
-        Ok(MemoryStats {
-            thread_allocated: thread::allocatedp::read()
-                .map_err(|e| format!("failed to read thread.allocatedp: {}", 
e))?
-                .get(),
-            thread_deallocated: thread::deallocatedp::read()
-                .map_err(|e| format!("failed to read thread.deallocatedp: {}", 
e))?
-                .get(),
-            allocated: stats::allocated::read()
-                .map_err(|e| format!("failed to read allocated: {}", e))?
-                .try_into()
-                .unwrap(),
-            active: stats::active::read()
-                .map_err(|e| format!("failed to read active: {}", e))?
-                .try_into()
-                .unwrap(),
-            metadata: stats::metadata::read()
-                .map_err(|e| format!("failed to read metadata: {}", e))?
-                .try_into()
-                .unwrap(),
-            mapped: stats::mapped::read()
-                .map_err(|e| format!("failed to read mapped: {}", e))?
-                .try_into()
-                .unwrap(),
-            resident: stats::resident::read()
-                .map_err(|e| format!("failed to read resident: {}", e))?
-                .try_into()
-                .unwrap(),
-            retained: stats::retained::read()
-                .map_err(|e| format!("failed to read retained: {}", e))?
-                .try_into()
-                .unwrap(),
-        })
-    }
-
-    pub fn diff(&self, other: &MemoryStats) -> MemoryStatsDiff {
-        MemoryStatsDiff {
-            thread_allocated_diff: other.thread_allocated as i64 - 
self.thread_allocated as i64,
-            thread_deallocated_diff: other.thread_deallocated as i64
-                - self.thread_deallocated as i64,
-            allocated: other.allocated as i64 - self.allocated as i64,
-            active: other.active as i64 - self.active as i64,
-            metadata: other.metadata as i64 - self.metadata as i64,
-            mapped: other.mapped as i64 - self.mapped as i64,
-            resident: other.resident as i64 - self.resident as i64,
-            retained: other.retained as i64 - self.retained as i64,
-        }
-    }
-}
-
 pub struct MemoryBenchConfig {
     pub test_data: Bytes,
     pub scale: usize,
@@ -274,22 +194,50 @@ impl MemoryBenchConfig {
             mode,
         }
     }
+}
 
-    pub fn output_json(&self, memory_diff: &MemoryStatsDiff) {
-        let result = json!({
-            "mode": self.mode,
-            "scale": self.scale,
-            "memory": {
-                "thread_allocated_diff": memory_diff.thread_allocated_diff,
-                "thread_deallocated_diff": memory_diff.thread_deallocated_diff,
-                "allocated": memory_diff.allocated,
-                "active": memory_diff.active,
-                "metadata": memory_diff.metadata,
-                "mapped": memory_diff.mapped,
-                "resident": memory_diff.resident,
-                "retained": memory_diff.retained
-            }
-        });
-        println!("{}", result);
+/// Run a workload concurrently with up to CPU core count threads.
+pub fn run_concurrent_threads<F>(scale: usize, worker: F) -> Result<(), String>
+where
+    F: Fn(usize) -> Result<(), String> + Send + Sync + 'static,
+{
+    let threads = std::cmp::min(scale, num_cpus::get());
+    if threads == 0 {
+        return Ok(());
+    }
+    let base = scale / threads;
+    let extra = scale % threads;
+    let worker = std::sync::Arc::new(worker);
+    let mut handles = Vec::with_capacity(threads);
+    for i in 0..threads {
+        let n = base + if i < extra { 1 } else { 0 };
+        if n == 0 {
+            continue;
+        }
+        let w = std::sync::Arc::clone(&worker);
+        handles.push(std::thread::spawn(move || (w)(n)));
+    }
+    for h in handles {
+        h.join().map_err(|_| "thread panicked".to_string())??;
+    }
+    Ok(())
+}
+
+/// Convert bytes number to a human-readable string.
+pub fn human_bytes(bytes: u64) -> String {
+    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
+    if bytes == 0 {
+        return "0 B".to_string();
+    }
+    let mut size = bytes as f64;
+    let mut unit = 0usize;
+    while size >= 1024.0 && unit < UNITS.len() - 1 {
+        size /= 1024.0;
+        unit += 1;
+    }
+    if unit == 0 {
+        format!("{} {}", bytes, UNITS[unit])
+    } else {
+        format!("{:.1} {}", size, UNITS[unit])
     }
 }
diff --git a/src/remote_write/Cargo.toml b/src/remote_write/Cargo.toml
index 344308c4..e0295cad 100644
--- a/src/remote_write/Cargo.toml
+++ b/src/remote_write/Cargo.toml
@@ -30,10 +30,8 @@ unsafe-split = []
 
 [dependencies]
 anyhow = { workspace = true }
-async-trait = { workspace = true }
 bytes = { workspace = true }
-deadpool = { workspace = true }
+object-pool = { workspace = true }
 once_cell = { workspace = true }
 pb_types = { workspace = true }
 prost = { workspace = true }
-tokio = { workspace = true }
diff --git a/src/remote_write/README.md b/src/remote_write/README.md
index f41a00c5..173d6794 100644
--- a/src/remote_write/README.md
+++ b/src/remote_write/README.md
@@ -6,7 +6,7 @@ A hand-written [Prometheus Remote Write Request 
(V1)](https://prometheus.io/docs
 
 Key optimization techniques:
 
-- Object pooling backed by deadpool.
+- Object pooling backed by [object-pool](https://github.com/CJP10/object-pool).
 
 - `RepeatedField` data structures.
 
@@ -29,6 +29,12 @@ pip3 install tabulate matplotlib
 
 ### CPU Time
 
+#### Benchmark Logic
+
+We benchmarked both sequential and concurrent parsing scenarios. In the 
sequential case, we simply executed the parsing task `scale` times within a for 
loop. In the concurrent case, we spawned a number of system-level threads equal 
to the number of CPU cores and evenly distributed the `scale` parsing tasks 
among them, with each thread performing sequential parsing in a for loop.
+
+**Note**: Since Go cannot directly create a system-level thread like 
`std::thread::spawn` in Rust, we ignored the concurrent benchmark logic for Go.
+
 #### Steps
 
 Navigate to the benchmarks directory:
@@ -58,7 +64,7 @@ cd VictoriaMetrics/lib/prompb
 vim prom_decode_bench_test.go
 ```
 
-and add the following code (please change the path of 
1709380533560664458.data):
+and add the following code (please change the path of 
`1709380533560664458.data`):
 
 ```go
 package prompb
@@ -142,29 +148,6 @@ func benchDecoderSequential(decoder Decoder, data []byte, 
n int) error {
     return nil
 }
 
-// Concurrent benchmark.
-func benchDecoderConcurrent(decoder Decoder, data []byte, workers int) error {
-    results := make(chan error, workers)
-
-    // Spawn workers (similar to tokio::spawn in Rust).
-    for w := 0; w < workers; w++ {
-        go func() {
-            clonedDecoder := decoder.Clone()
-            clonedDecoder.Reset()
-            err := clonedDecoder.Parse(data)
-            results <- err
-        }()
-    }
-
-    // Wait for all workers to complete (similar to join_handle.await).
-    for w := 0; w < workers; w++ {
-        if err := <-results; err != nil {
-            return err
-        }
-    }
-    return nil
-}
-
 func BenchmarkSequentialParse(b *testing.B) {
     data, err := getTestDataPath()
     if err != nil {
@@ -191,33 +174,6 @@ func BenchmarkSequentialParse(b *testing.B) {
         }
     }
 }
-
-func BenchmarkConcurrentParse(b *testing.B) {
-    data, err := getTestDataPath()
-    if err != nil {
-        b.Skipf("test data file not found: %v", err)
-    }
-
-    decoders := map[string]Decoder{
-        "pooled": NewPooledDecoder(),
-        "nopool": NewNoPoolDecoder(),
-    }
-
-    workers := []int{1, 5, 10, 20, 100}
-
-    for decoderName, decoder := range decoders {
-        for _, w := range workers {
-            b.Run(fmt.Sprintf("%s/%d", decoderName, w), func(b *testing.B) {
-                b.ResetTimer()
-                for i := 0; i < b.N; i++ {
-                    if err := benchDecoderConcurrent(decoder, data, w); err != 
nil {
-                        b.Fatalf("failed to parse: %v", err)
-                    }
-                }
-            })
-        }
-    }
-}
 ```
 
 Execute the Go benchmarks:
@@ -232,16 +188,18 @@ Test results are as follows:
 
 ![Sequential 
Performance](../../docs/assets/remote-write-sequential-performance.png)
 
-In sequential parsing scenarios, the hand-written pooled parsers (with and 
without unsafe optimization) achieve the best performance across all scales 
compared to other Rust parsers. The unsafe optimization provides nearly 50% 
performance improvement.
-
 ![Concurrent 
Performance](../../docs/assets/remote-write-concurrent-performance.png)
 
-**Note**: Due to the nature of concurrent execution, concurrent parsing 
benchmark results may vary (sometimes dramatically) across different runs. 
However, we can still draw an overall conclusion.
-
-In concurrent parsing scenarios, from an overall perspective, the hand-written 
pooled parsers (with and without unsafe optimization) still achieve the best 
performance compared to other Rust parsers. The unsafe optimization continues 
to provide performance improvements.
+In both scenarios, the hand-written pooled parsers (with and without unsafe 
optimization) achieve the best performance across all scales compared to other 
Rust parsers. The unsafe optimization provides nearly 50% performance 
improvement.
 
 ### Memory Allocation
 
+#### Benchmark Logic
+
+We use [hotpath](https://github.com/pawurb/hotpath) to mesure the memory 
allocation performance of different parsers.
+
+#### Steps
+
 Navigate to the benchmarks directory:
 
 ```shell
@@ -251,40 +209,22 @@ cd src/benchmarks
 Run memory allocation benchmarks:
 
 ```shell
-python3 remote_write_memory_bench.py --mode sequential --scale 10
+python3 remote_write_memory_bench.py --mode sequential --scale 64
 ```
 
 Or enable unsafe optimization:
 
 ```shell
-python3 remote_write_memory_bench.py --mode concurrent --scale 10 --unsafe
+python3 remote_write_memory_bench.py --mode concurrent --scale 64 --unsafe
 ```
 
-**Note**: Sequential and concurrent mode results are similar due to the 
enforced `#[tokio::main(flavor = "current_thread")]` configuration. This 
constraint is necessary because Jemalloc's `thread::allocatedp` and 
`thread::deallocatedp` statistics can only track single-threaded allocations 
accurately.
-
-We focus on the 
[allocatedp](https://docs.rs/tikv-jemalloc-ctl/0.6.0/tikv_jemalloc_ctl/thread/struct.allocatedp.html)
 value to verify our zero-allocation parsing efforts, since it represents the 
number of bytes that **have ever been** allocated by the thread.
+#### Results
 
 The results are as follows:
 
 ![Memory](../../docs/assets/remote-write-memory-performance.png)
 
-The hand-written pooled parser allocates minimal memory compared to other Rust 
parsers. Note that the difference between `ThreadAlloc` and `ThreadDealloc` in 
the pooled decoder is expected since we gather statistics right before the 
program terminates and objects remain in the pool (not freed) at that time.
-
-### Object Pool Efficiency
-
-Navigate to the benchmarks directory:
-
-```shell
-cd src/benchmarks
-```
-
-Analyze pool utilization:
-
-```shell
-cargo run --bin pool_stats --release
-```
-
-Our testing finds that only 8 objects in the pool are sufficient to handle 500 
concurrent parsing operations.
+The hand-written pooled parser allocates minimal memory allocation in total 
compared to other Rust parsers, demonstrating the effectiveness of our 
zero-allocation optimization.
 
 ## Acknowledgements
 
diff --git a/src/remote_write/src/pooled_parser.rs 
b/src/remote_write/src/pooled_parser.rs
index 8b2c62fb..3b90b751 100644
--- a/src/remote_write/src/pooled_parser.rs
+++ b/src/remote_write/src/pooled_parser.rs
@@ -25,10 +25,11 @@
 
 use anyhow::Result;
 use bytes::Bytes;
+use object_pool::ReusableOwned;
 
 use crate::{
     pb_reader::read_write_request,
-    pooled_types::{WriteRequest, WriteRequestManager, POOL},
+    pooled_types::{WriteRequest, POOL},
     repeated_field::Clear,
 };
 
@@ -40,26 +41,13 @@ impl PooledParser {
         Self
     }
 
-    /// Decode a [`WriteRequest`] from the buffer and return it.
-    pub fn decode(&self, buf: Bytes) -> Result<WriteRequest> {
-        // Cannot get a WriteRequest instance from the pool in sync functions.
-        let mut request = WriteRequest::default();
-        read_write_request(buf, &mut request)?;
-        Ok(request)
-    }
-
-    /// Decode a [`WriteRequest`] from the buffer and return a pooled object.
+    /// Decode a [`WriteRequest`] from the buffer.
     ///
     /// This method will reuse a [`WriteRequest`] instance from the object
     /// pool. After the returned object is dropped, it will be returned to the
-    pub async fn decode_async(
-        &self,
-        buf: Bytes,
-    ) -> Result<deadpool::managed::Object<WriteRequestManager>> {
-        let mut pooled_request = POOL
-            .get()
-            .await
-            .map_err(|e| anyhow::anyhow!("failed to get object from pool: 
{e:?}"))?;
+    /// pool.
+    pub fn decode(&self, buf: Bytes) -> Result<ReusableOwned<WriteRequest>> {
+        let mut pooled_request = POOL.pull_owned(WriteRequest::default);
         pooled_request.clear();
         read_write_request(buf, &mut pooled_request)?;
         Ok(pooled_request)
diff --git a/src/remote_write/src/pooled_types.rs 
b/src/remote_write/src/pooled_types.rs
index 427eb607..b0ba6e2d 100644
--- a/src/remote_write/src/pooled_types.rs
+++ b/src/remote_write/src/pooled_types.rs
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use async_trait::async_trait;
+use std::sync::Arc;
+
 use bytes::Bytes;
-use deadpool::managed::{Manager, Metrics, Pool, RecycleResult};
+use object_pool::Pool;
 use once_cell::sync::Lazy;
 
 use crate::repeated_field::{Clear, RepeatedField};
@@ -160,33 +161,8 @@ impl Clear for WriteRequest {
     }
 }
 
-/// A deadpool manager for PooledWriteRequest.
-pub struct WriteRequestManager;
-
-#[async_trait]
-impl Manager for WriteRequestManager {
-    type Error = ();
-    type Type = WriteRequest;
-
-    async fn create(&self) -> Result<Self::Type, Self::Error> {
-        Ok(WriteRequest::default())
-    }
-
-    async fn recycle(
-        &self,
-        _obj: &mut Self::Type,
-        _metrics: &Metrics,
-    ) -> RecycleResult<Self::Error> {
-        // We will reset the object after acquiring it.
-        Ok(())
-    }
-}
-
-const POOL_SIZE: usize = 64; // Maximum number of objects in the pool.
+const POOL_SIZE: usize = 16; // Maximum number of objects in the pool.
 
-pub static POOL: Lazy<Pool<WriteRequestManager>> = Lazy::new(|| {
-    Pool::builder(WriteRequestManager)
-        .max_size(POOL_SIZE)
-        .build()
-        .unwrap()
-});
+/// Global thread-safe object pool for `WriteRequest`.
+pub static POOL: Lazy<Arc<Pool<WriteRequest>>> =
+    Lazy::new(|| Arc::new(Pool::new(POOL_SIZE, WriteRequest::default)));
diff --git a/src/remote_write/tests/equivalence_test.rs 
b/src/remote_write/tests/equivalence_test.rs
index e545ecd4..96b2c7bd 100644
--- a/src/remote_write/tests/equivalence_test.rs
+++ b/src/remote_write/tests/equivalence_test.rs
@@ -21,13 +21,12 @@
 //!
 //! Test with `--features unsafe-split` to enable the unsafe optimization.
 
-use std::{fs, sync::Arc};
+use std::{fs, sync::Arc, thread};
 
 use bytes::Bytes;
 use pb_types::{Exemplar, Label, MetricMetadata, Sample, TimeSeries, 
WriteRequest};
 use prost::Message;
 use remote_write::pooled_parser::PooledParser;
-use tokio::task::JoinHandle;
 
 const ITERATIONS: usize = 50;
 
@@ -47,13 +46,10 @@ fn parse_with_prost(data: &Bytes) -> WriteRequest {
     WriteRequest::decode(data.clone()).expect("prost decode failed")
 }
 
-async fn parse_with_pooled(data: &Bytes) -> WriteRequest {
+fn parse_with_pooled(data: &Bytes) -> WriteRequest {
     let data_copy = data.clone();
     let parser = PooledParser;
-    let pooled_request = parser
-        .decode_async(data_copy)
-        .await
-        .expect("pooled decode failed");
+    let pooled_request = parser.decode(data_copy).expect("pooled decode 
failed");
 
     // Convert pooled types to pb_types to compare with prost.
     let mut write_request = WriteRequest {
@@ -118,8 +114,8 @@ async fn parse_with_pooled(data: &Bytes) -> WriteRequest {
     write_request
 }
 
-#[tokio::test]
-async fn test_sequential_correctness() {
+#[test]
+fn test_sequential_correctness() {
     let (data1, data2) = load_test_data();
     let datasets = [&data1, &data2];
 
@@ -128,7 +124,7 @@ async fn test_sequential_correctness() {
         let data = datasets[data_index];
 
         let prost_result = parse_with_prost(data);
-        let pooled_result = parse_with_pooled(data).await;
+        let pooled_result = parse_with_pooled(data);
 
         assert_eq!(
             &prost_result, &pooled_result,
@@ -138,19 +134,19 @@ async fn test_sequential_correctness() {
     }
 }
 
-#[tokio::test]
-async fn test_concurrent_correctness() {
+#[test]
+fn test_concurrent_correctness() {
     let (data1, data2) = load_test_data();
     let data1 = Arc::new(data1);
     let data2 = Arc::new(data2);
 
-    let mut handles: Vec<JoinHandle<()>> = Vec::new();
+    let mut handles = Vec::new();
 
     for iteration in 0..ITERATIONS {
         let data1_clone = Arc::clone(&data1);
         let data2_clone = Arc::clone(&data2);
 
-        let handle = tokio::spawn(async move {
+        let handle = thread::spawn(move || {
             let data_index = iteration % 2;
             let data = if data_index == 0 {
                 &*data1_clone
@@ -159,7 +155,7 @@ async fn test_concurrent_correctness() {
             };
 
             let prost_result = parse_with_prost(data);
-            let pooled_result = parse_with_pooled(data).await;
+            let pooled_result = parse_with_pooled(data);
 
             assert_eq!(
                 &prost_result, &pooled_result,
@@ -172,6 +168,6 @@ async fn test_concurrent_correctness() {
     }
 
     for handle in handles {
-        handle.await.expect("task completion failed");
+        handle.join().expect("thread completion failed");
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to