This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 9cec5636 feat: add hand-written Prometheus remote write request parser 
(#1628)
9cec5636 is described below

commit 9cec5636a80875a3d35a007d41cf5ee98fcf2dbb
Author: Peiyang He <[email protected]>
AuthorDate: Tue Oct 28 22:49:42 2025 +0800

    feat: add hand-written Prometheus remote write request parser (#1628)
    
    ## Rationale
    
    A hand-written remote write request parser to replace prost, with the
    primary goal of achieving zero-allocation parsing.
    
    ## Detailed Changes
    
    - The new `remote_write` directory contains the core implementation of
    the hand-written parser.
    
    - Add benchmarks.
    
    - Add a .proto file in `pb_types`.
    
    - Fix CI errors.
    
    ## Test Plan
    
    - Unit tests can be found at the end of the `pb_reader.rs`.
    
    - A comprehensive equivalence test is available in
    `equivalence_test.rs`, which validates the correctness of the
    hand-written parser by comparing its output with that of the prost
    auto-generated parser.
---
 .github/workflows/ci.yml                           |   2 +
 .gitignore                                         |   1 +
 Cargo.lock                                         | 238 ++++++++-
 Cargo.toml                                         |  68 +--
 LICENSE                                            |   4 +-
 .../assets/remote-write-concurrent-performance.png | Bin 0 -> 578747 bytes
 docs/assets/remote-write-memory-performance.png    | Bin 0 -> 119032 bytes
 .../assets/remote-write-sequential-performance.png | Bin 0 -> 620542 bytes
 licenserc.toml                                     |   2 +
 src/benchmarks/Cargo.toml                          |  25 +
 src/benchmarks/benches/bench.rs                    | 107 +++-
 src/benchmarks/build.rs                            | 106 ++++
 src/benchmarks/config.toml                         |   5 +
 src/benchmarks/remote_write_memory_bench.py        | 237 +++++++++
 src/benchmarks/src/bin/parser_mem.rs               | 138 +++++
 src/benchmarks/src/bin/pool_stats.rs               |  82 +++
 src/benchmarks/src/config.rs                       |   8 +
 src/benchmarks/src/lib.rs                          |  15 +-
 src/benchmarks/src/remote_write_bench.rs           | 175 +++++++
 src/benchmarks/src/util.rs                         | 125 +++++
 src/pb_types/build.rs                              |   5 +-
 src/pb_types/protos/remote_write.proto             |  77 +++
 src/pb_types/src/lib.rs                            |   5 +
 src/{benchmarks => remote_write}/Cargo.toml        |  23 +-
 src/remote_write/README.md                         | 291 +++++++++++
 src/{benchmarks => remote_write}/src/lib.rs        |   9 +-
 src/remote_write/src/pb_reader.rs                  | 565 +++++++++++++++++++++
 src/remote_write/src/pooled_parser.rs              |  73 +++
 src/remote_write/src/pooled_types.rs               | 192 +++++++
 src/remote_write/src/repeated_field.rs             | 534 +++++++++++++++++++
 src/remote_write/tests/equivalence_test.rs         | 177 +++++++
 .../tests/workloads/1709380533560664458.data       | Bin 0 -> 1690278 bytes
 .../tests/workloads/1709380533705807779.data       | Bin 0 -> 1687445 bytes
 33 files changed, 3214 insertions(+), 75 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ea01d87e..ee55523d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -61,6 +61,7 @@ jobs:
         run: |
           sudo apt update
           sudo apt install --yes protobuf-compiler
+          cargo install pb-rs
       - name: Install check binaries
         run: |
           cargo install --git https://github.com/DevinR528/cargo-sort --rev 
55ec890 --locked
@@ -82,6 +83,7 @@ jobs:
         run: |
           sudo apt update
           sudo apt install --yes protobuf-compiler
+          cargo install pb-rs
       - uses: Swatinem/rust-cache@v2
       - name: Run Unit Tests
         run: |
diff --git a/.gitignore b/.gitignore
index f3b4b08a..21aab843 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,5 +9,6 @@ integration_tests/dist_query/output
 .project
 .tools
 bin
+!src/benchmarks/src/bin
 coverage.txt
 tini
diff --git a/Cargo.lock b/Cargo.lock
index 42ca6c48..a97698ea 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -546,7 +546,7 @@ dependencies = [
  "memchr",
  "num",
  "regex",
- "regex-syntax",
+ "regex-syntax 0.8.4",
 ]
 
 [[package]]
@@ -633,9 +633,19 @@ dependencies = [
  "columnar_storage",
  "common",
  "criterion",
+ "deadpool",
+ "num_cpus",
  "pb_types",
  "prost",
+ "protobuf",
+ "protobuf-codegen",
+ "quick-protobuf",
+ "remote_write",
  "serde",
+ "serde_json",
+ "tikv-jemalloc-ctl",
+ "tikv-jemallocator",
+ "tokio",
  "toml",
  "tracing",
  "tracing-subscriber",
@@ -1410,7 +1420,7 @@ dependencies = [
  "itertools 0.13.0",
  "log",
  "paste",
- "regex-syntax",
+ "regex-syntax 0.8.4",
 ]
 
 [[package]]
@@ -1524,6 +1534,24 @@ dependencies = [
  "strum",
 ]
 
+[[package]]
+name = "deadpool"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
+dependencies = [
+ "async-trait",
+ "deadpool-runtime",
+ "num_cpus",
+ "tokio",
+]
+
+[[package]]
+name = "deadpool-runtime"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
+
 [[package]]
 name = "deranged"
 version = "0.3.11"
@@ -1842,6 +1870,15 @@ version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
 
+[[package]]
+name = "home"
+version = "0.5.11"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
+dependencies = [
+ "windows-sys 0.59.0",
+]
+
 [[package]]
 name = "http"
 version = "0.2.12"
@@ -2152,11 +2189,11 @@ dependencies = [
 
 [[package]]
 name = "matchers"
-version = "0.2.0"
+version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
+checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
 dependencies = [
- "regex-automata",
+ "regex-automata 0.1.10",
 ]
 
 [[package]]
@@ -2226,11 +2263,12 @@ checksum = 
"defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
 
 [[package]]
 name = "nu-ansi-term"
-version = "0.50.1"
+version = "0.46.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399"
+checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
 dependencies = [
- "windows-sys 0.52.0",
+ "overload",
+ "winapi",
 ]
 
 [[package]]
@@ -2383,6 +2421,12 @@ dependencies = [
  "num-traits",
 ]
 
+[[package]]
+name = "overload"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
+
 [[package]]
 name = "parking_lot"
 version = "0.12.3"
@@ -2672,6 +2716,66 @@ dependencies = [
  "prost",
 ]
 
+[[package]]
+name = "protobuf"
+version = "3.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
+dependencies = [
+ "once_cell",
+ "protobuf-support",
+ "thiserror",
+]
+
+[[package]]
+name = "protobuf-codegen"
+version = "3.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace"
+dependencies = [
+ "anyhow",
+ "once_cell",
+ "protobuf",
+ "protobuf-parse",
+ "regex",
+ "tempfile",
+ "thiserror",
+]
+
+[[package]]
+name = "protobuf-parse"
+version = "3.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973"
+dependencies = [
+ "anyhow",
+ "indexmap",
+ "log",
+ "protobuf",
+ "protobuf-support",
+ "tempfile",
+ "thiserror",
+ "which",
+]
+
+[[package]]
+name = "protobuf-support"
+version = "3.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
+dependencies = [
+ "thiserror",
+]
+
+[[package]]
+name = "quick-protobuf"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9d6da84cc204722a989e01ba2f6e1e276e190f22263d0cb6ce8526fcdb0d2e1f"
+dependencies = [
+ "byteorder",
+]
+
 [[package]]
 name = "quote"
 version = "1.0.37"
@@ -2748,8 +2852,17 @@ checksum = 
"4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
 dependencies = [
  "aho-corasick",
  "memchr",
- "regex-automata",
- "regex-syntax",
+ "regex-automata 0.4.7",
+ "regex-syntax 0.8.4",
+]
+
+[[package]]
+name = "regex-automata"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
+dependencies = [
+ "regex-syntax 0.6.29",
 ]
 
 [[package]]
@@ -2760,7 +2873,7 @@ checksum = 
"38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df"
 dependencies = [
  "aho-corasick",
  "memchr",
- "regex-syntax",
+ "regex-syntax 0.8.4",
 ]
 
 [[package]]
@@ -2769,12 +2882,32 @@ version = "0.1.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
 
+[[package]]
+name = "regex-syntax"
+version = "0.6.29"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
+
 [[package]]
 name = "regex-syntax"
 version = "0.8.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
 
+[[package]]
+name = "remote_write"
+version = "2.2.0-alpha"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "bytes",
+ "deadpool",
+ "once_cell",
+ "pb_types",
+ "prost",
+ "tokio",
+]
+
 [[package]]
 name = "rustc-demangle"
 version = "0.1.24"
@@ -3178,6 +3311,37 @@ dependencies = [
  "ordered-float",
 ]
 
+[[package]]
+name = "tikv-jemalloc-ctl"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c"
+dependencies = [
+ "libc",
+ "paste",
+ "tikv-jemalloc-sys",
+]
+
+[[package]]
+name = "tikv-jemalloc-sys"
+version = "0.5.4+5.3.0-patched"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1"
+dependencies = [
+ "cc",
+ "libc",
+]
+
+[[package]]
+name = "tikv-jemallocator"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca"
+dependencies = [
+ "libc",
+ "tikv-jemalloc-sys",
+]
+
 [[package]]
 name = "time"
 version = "0.3.37"
@@ -3323,9 +3487,9 @@ dependencies = [
 
 [[package]]
 name = "tracing"
-version = "0.1.41"
+version = "0.1.40"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
 dependencies = [
  "log",
  "pin-project-lite",
@@ -3335,9 +3499,9 @@ dependencies = [
 
 [[package]]
 name = "tracing-attributes"
-version = "0.1.30"
+version = "0.1.27"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -3346,9 +3510,9 @@ dependencies = [
 
 [[package]]
 name = "tracing-core"
-version = "0.1.34"
+version = "0.1.32"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
 dependencies = [
  "once_cell",
  "valuable",
@@ -3367,14 +3531,14 @@ dependencies = [
 
 [[package]]
 name = "tracing-subscriber"
-version = "0.3.20"
+version = "0.3.18"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5"
+checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
 dependencies = [
  "matchers",
  "nu-ansi-term",
  "once_cell",
- "regex-automata",
+ "regex",
  "sharded-slab",
  "smallvec",
  "thread_local",
@@ -3552,6 +3716,34 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "which"
+version = "4.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
+dependencies = [
+ "either",
+ "home",
+ "once_cell",
+ "rustix",
+]
+
+[[package]]
+name = "winapi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
+dependencies = [
+ "winapi-i686-pc-windows-gnu",
+ "winapi-x86_64-pc-windows-gnu",
+]
+
+[[package]]
+name = "winapi-i686-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
+
 [[package]]
 name = "winapi-util"
 version = "0.1.9"
@@ -3561,6 +3753,12 @@ dependencies = [
  "windows-sys 0.59.0",
 ]
 
+[[package]]
+name = "winapi-x86_64-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+
 [[package]]
 name = "windows-core"
 version = "0.52.0"
diff --git a/Cargo.toml b/Cargo.toml
index 35e3a56b..b4bab12b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,60 +15,64 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace.package]
-version = "2.2.0-alpha"
-authors = ["Apache HoraeDB(incubating) <[email protected]>"]
-edition = "2021"
-license = "Apache-2.0"
-repository = "https://github.com/apache/horaedb";
-homepage = "https://horaedb.apache.org/";
-description = "A high-performance, distributed, cloud native time-series 
database."
-
 [workspace]
 resolver = "2"
 members = [
     "src/benchmarks",
-    "src/columnar_storage"
-,
+    "src/columnar_storage",
     "src/common",
     "src/metric_engine",
     "src/pb_types",
+    "src/remote_write",
     "src/server"
 ]
 
+[workspace.package]
+version = "2.2.0-alpha"
+authors = ["Apache HoraeDB(incubating) <[email protected]>"]
+edition = "2021"
+license = "Apache-2.0"
+repository = "https://github.com/apache/horaedb";
+homepage = "https://horaedb.apache.org/";
+description = "A high-performance, distributed, cloud native time-series 
database."
+
 [workspace.dependencies]
 anyhow = { version = "1.0" }
-seahash = { version = "4" }
-metric_engine = { path = "src/metric_engine" }
+arrow = { version = "53", features = ["prettyprint"] }
+arrow-schema = "53"
+async-scoped = { version = "0.9.0", features = ["use-tokio"] }
+async-stream = "0.3"
+async-trait = "0.1"
+byteorder = "1"
+bytes = "1"
+bytesize = "1"
+clap = "4"
 columnar_storage = { path = "src/columnar_storage" }
 common = { path = "src/common" }
-thiserror = "1"
-bytes = "1"
-byteorder = "1"
+criterion = "0.5"
 datafusion = "43"
-parquet = { version = "53" }
+deadpool = "0.10"
+futures = "0.3"
+itertools = "0.3"
+lazy_static = "1"
+metric_engine = { path = "src/metric_engine" }
 object_store = { version = "0.11" }
+once_cell = "1"
+parquet = { version = "53" }
 pb_types = { path = "src/pb_types" }
 prost = { version = "0.13" }
-arrow = { version = "53", features = ["prettyprint"] }
-bytesize = "1"
-clap = "4"
-arrow-schema = "53"
-tokio = { version = "1", features = ["full"] }
-async-trait = "0.1"
-async-stream = "0.3"
-futures = "0.3"
+remote_write = { path = "src/remote_write" }
+seahash = { version = "4" }
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
 temp-dir = "0.1"
-itertools = "0.3"
-lazy_static = "1"
+test-log = "0.2"
+thiserror = "1"
+tokio = { version = "1", features = ["full"] }
+toml = "0.8"
 tracing = "0.1"
 tracing-subscriber = "0.3"
-async-scoped = { version = "0.9.0", features = ["use-tokio"] }
-test-log = "0.2"
 uuid = "1"
-criterion = "0.5"
-serde = { version = "1.0", features = ["derive"] }
-toml = "0.8"
 
 # This profile optimizes for good runtime performance.
 [profile.release]
diff --git a/LICENSE b/LICENSE
index 866183b7..c31a8935 100644
--- a/LICENSE
+++ b/LICENSE
@@ -237,6 +237,8 @@ The following components are provided under the MIT 
License. See project link fo
 The text of each license is also included in licenses/LICENSE-[project].txt
 
 * consistent(https://github.com/buraksezer/consistent)
+* prom-write-request-bench(https://github.com/v0y4g3r/prom-write-request-bench)
 
 Files 
horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go,
-horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go
 are modified from consistent.
\ No newline at end of file
+horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go
 are modified from consistent.
+File src/remote_write/src/repeated_field.rs from prom-write-request-bench.
\ No newline at end of file
diff --git a/docs/assets/remote-write-concurrent-performance.png 
b/docs/assets/remote-write-concurrent-performance.png
new file mode 100644
index 00000000..15d185a1
Binary files /dev/null and 
b/docs/assets/remote-write-concurrent-performance.png differ
diff --git a/docs/assets/remote-write-memory-performance.png 
b/docs/assets/remote-write-memory-performance.png
new file mode 100644
index 00000000..c5859820
Binary files /dev/null and b/docs/assets/remote-write-memory-performance.png 
differ
diff --git a/docs/assets/remote-write-sequential-performance.png 
b/docs/assets/remote-write-sequential-performance.png
new file mode 100644
index 00000000..a401c285
Binary files /dev/null and 
b/docs/assets/remote-write-sequential-performance.png differ
diff --git a/licenserc.toml b/licenserc.toml
index 9bc60b63..54129a8f 100644
--- a/licenserc.toml
+++ b/licenserc.toml
@@ -20,4 +20,6 @@ headerPath = "Apache-2.0-ASF.txt"
 excludes = [
     # Forked
     "src/common/src/size_ext.rs",
+    "src/remote_write/src/repeated_field.rs",
+    "src/pb_types/protos/remote_write.proto",
 ]
diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml
index a6c017bf..de738d01 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/benchmarks/Cargo.toml
@@ -25,19 +25,44 @@ repository.workspace = true
 homepage.workspace = true
 description.workspace = true
 
+[[bin]]
+name = "parser_mem"
+path = "src/bin/parser_mem.rs"
+
+[[bin]]
+name = "pool_stats"
+path = "src/bin/pool_stats.rs"
+
+[features]
+unsafe-split = ["remote_write/unsafe-split"]
+
 [dependencies]
 bytes = { workspace = true }
 columnar_storage = { workspace = true }
 common = { workspace = true }
+deadpool = { workspace = true }
 pb_types = { workspace = true }
 prost = { workspace = true }
+protobuf = "3.7"
+quick-protobuf = "0.8"
+remote_write = { workspace = true }
 serde = { workspace = true }
+serde_json = { workspace = true }
+tikv-jemalloc-ctl = "0.5"
+tokio = { workspace = true }
 toml = { workspace = true }
 tracing = { workspace = true }
 tracing-subscriber = { workspace = true }
 
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5"
+
+[build-dependencies]
+protobuf-codegen = "3.7"
+
 [dev-dependencies]
 criterion = { workspace = true }
+num_cpus = "1.16"
 
 [[bench]]
 name = "bench"
diff --git a/src/benchmarks/benches/bench.rs b/src/benchmarks/benches/bench.rs
index 5ce94444..b932a441 100644
--- a/src/benchmarks/benches/bench.rs
+++ b/src/benchmarks/benches/bench.rs
@@ -22,6 +22,7 @@ use std::{cell::RefCell, sync::Once};
 use benchmarks::{
     config::{self, BenchConfig},
     encoding_bench::EncodingBench,
+    remote_write_bench::RemoteWriteBench,
 };
 use criterion::*;
 
@@ -56,10 +57,114 @@ fn bench_manifest_encoding(c: &mut Criterion) {
     group.finish();
 }
 
+fn bench_remote_write(c: &mut Criterion) {
+    let config = init_bench();
+
+    let sequential_scales = config.remote_write.sequential_scales.clone();
+    let concurrent_scales = config.remote_write.concurrent_scales.clone();
+    let bench = RefCell::new(RemoteWriteBench::new(config.remote_write));
+
+    let rt = tokio::runtime::Builder::new_multi_thread()
+        .worker_threads(num_cpus::get())
+        .enable_all()
+        .build()
+        .unwrap();
+
+    // Sequential parse bench.
+    let mut group = c.benchmark_group("remote_write_sequential");
+
+    for &n in &sequential_scales {
+        group.bench_with_input(
+            BenchmarkId::new("prost", n),
+            &(&bench, n),
+            |b, (bench, scale)| {
+                let bench = bench.borrow();
+                b.iter(|| bench.prost_parser_sequential(*scale).unwrap())
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("pooled", n),
+            &(&bench, &rt, n),
+            |b, (bench, rt, scale)| {
+                let bench = bench.borrow();
+                b.iter(|| 
rt.block_on(bench.pooled_parser_sequential(*scale)).unwrap())
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("quick_protobuf", n),
+            &(&bench, n),
+            |b, (bench, scale)| {
+                let bench = bench.borrow();
+                b.iter(|| 
bench.quick_protobuf_parser_sequential(*scale).unwrap())
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("rust_protobuf", n),
+            &(&bench, n),
+            |b, (bench, scale)| {
+                let bench = bench.borrow();
+                b.iter(|| 
bench.rust_protobuf_parser_sequential(*scale).unwrap())
+            },
+        );
+    }
+    group.finish();
+
+    // Concurrent parse bench.
+    let mut group = c.benchmark_group("remote_write_concurrent");
+
+    for &scale in &concurrent_scales {
+        group.bench_with_input(
+            BenchmarkId::new("prost", scale),
+            &(&bench, &rt, scale),
+            |b, (bench, rt, scale)| {
+                let bench = bench.borrow();
+                b.iter(|| 
rt.block_on(bench.prost_parser_concurrent(*scale)).unwrap())
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("pooled", scale),
+            &(&bench, &rt, scale),
+            |b, (bench, rt, scale)| {
+                let bench = bench.borrow();
+                b.iter(|| 
rt.block_on(bench.pooled_parser_concurrent(*scale)).unwrap())
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("quick_protobuf", scale),
+            &(&bench, &rt, scale),
+            |b, (bench, rt, scale)| {
+                let bench = bench.borrow();
+                b.iter(|| {
+                    rt.block_on(bench.quick_protobuf_parser_concurrent(*scale))
+                        .unwrap()
+                })
+            },
+        );
+
+        group.bench_with_input(
+            BenchmarkId::new("rust_protobuf", scale),
+            &(&bench, &rt, scale),
+            |b, (bench, rt, scale)| {
+                let bench = bench.borrow();
+                b.iter(|| {
+                    rt.block_on(bench.rust_protobuf_parser_concurrent(*scale))
+                        .unwrap()
+                })
+            },
+        );
+    }
+    group.finish();
+}
+
 criterion_group!(
     name = benches;
     config = Criterion::default();
-    targets = bench_manifest_encoding,
+    targets = bench_manifest_encoding, bench_remote_write,
 );
 
 criterion_main!(benches);
diff --git a/src/benchmarks/build.rs b/src/benchmarks/build.rs
new file mode 100644
index 00000000..a2f95931
--- /dev/null
+++ b/src/benchmarks/build.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{env, fs, path::PathBuf, process::Command};
+
+fn main() {
+    // Similar to prost, we generate rust-protobuf and quick-protobuf code to
+    // OUT_DIR instead of the src directory.
+    let proto_path = "../pb_types/protos/remote_write.proto";
+    let include_path = "../pb_types/protos";
+    let out_dir = env::var("OUT_DIR").unwrap();
+    let out_dir_path = PathBuf::from(&out_dir);
+
+    // Generate rust-protobuf code to OUT_DIR.
+    protobuf_codegen::Codegen::new()
+        .pure()
+        .out_dir(&out_dir)
+        .input(proto_path)
+        .include(include_path)
+        .run()
+        .expect("rust-protobuf code generation failed");
+
+    // Rename rust-protobuf generated file to avoid potential conflicts.
+    let src_file = out_dir_path.join("remote_write.rs");
+    let dst_file = out_dir_path.join("rust_protobuf_remote_write.rs");
+    fs::rename(&src_file, &dst_file).expect("rust-protobuf file rename 
failed");
+
+    // Generate quick-protobuf code to OUT_DIR using pb-rs command line tool.
+    let quick_protobuf_file = 
out_dir_path.join("quick_protobuf_remote_write.rs");
+    let output = Command::new("pb-rs")
+        .args([
+            "-I",
+            include_path,
+            "-o",
+            quick_protobuf_file.to_str().unwrap(),
+            "-s",
+            proto_path,
+        ])
+        .output()
+        .expect("pb-rs command execution failed");
+
+    if !output.status.success() {
+        panic!(
+            "pb-rs command execution failed: {}",
+            String::from_utf8_lossy(&output.stderr)
+        );
+    }
+
+    // Fix package namespace conflicts and inner attributes using sed.
+    let output = Command::new("sed")
+        .args([
+            "-i",
+            "-e",
+            "s/remote_write:://g",
+            "-e",
+            r"s/#!\[/#[/g",
+            "-e",
+            r"s/^\/\/! /\/\/ /g",
+            "-e",
+            
"s/pb_types::mod_MetricMetadata::MetricType/mod_MetricMetadata::MetricType/g",
+            quick_protobuf_file.to_str().unwrap(),
+        ])
+        .output()
+        .expect("sed command execution failed");
+    if !output.status.success() {
+        eprintln!(
+            "warning: sed patching quick-protobuf output failed: {}",
+            String::from_utf8_lossy(&output.stderr)
+        );
+    }
+
+    // Fix inner attributes in rust-protobuf generated file.
+    let output = Command::new("sed")
+        .args([
+            "-i",
+            "-e",
+            r"s/#!\[/#[/g",
+            "-e",
+            r"s/^\/\/! /\/\/ /g",
+            dst_file.to_str().unwrap(),
+        ])
+        .output()
+        .expect("sed command execution failed");
+    if !output.status.success() {
+        eprintln!(
+            "warning: sed patching rust-protobuf output failed: {}",
+            String::from_utf8_lossy(&output.stderr)
+        );
+    }
+
+    println!("cargo:rerun-if-changed={}", proto_path);
+}
diff --git a/src/benchmarks/config.toml b/src/benchmarks/config.toml
index 1859f303..00d27006 100644
--- a/src/benchmarks/config.toml
+++ b/src/benchmarks/config.toml
@@ -20,3 +20,8 @@ record_count = 1000
 append_count = 100
 bench_measurement_time = "5s"
 bench_sample_size = 10
+
+[remote_write]
+workload_file = "../remote_write/tests/workloads/1709380533560664458.data"
+sequential_scales = [1, 5, 10, 20, 50, 100]
+concurrent_scales = [1, 5, 10, 20, 50, 100]
diff --git a/src/benchmarks/remote_write_memory_bench.py 
b/src/benchmarks/remote_write_memory_bench.py
new file mode 100644
index 00000000..42a2e502
--- /dev/null
+++ b/src/benchmarks/remote_write_memory_bench.py
@@ -0,0 +1,237 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import subprocess
+import json
+import sys
+import os
+from typing import Dict, Any
+import argparse
+
+try:
+    from tabulate import tabulate
+except ImportError:
+    print("Error: tabulate library not found.")
+    print("Please install it with: pip3 install tabulate")
+    sys.exit(1)
+
+
+class MemoryBenchmark:
+    def __init__(self, scale, mode, use_unsafe=False):
+        self.project_path = "."
+        self.data_path = 
"../remote_write/tests/workloads/1709380533560664458.data"
+        self.scale = scale
+        self.mode = mode
+        self.use_unsafe = use_unsafe
+        self.parsers = [
+            "pooled",
+            "prost",
+            "rust-protobuf",
+            "quick-protobuf",
+        ]
+
+    def build_binary(self) -> bool:
+        features_msg = " with unsafe-split" if self.use_unsafe else ""
+        print(f"Building binary{features_msg}...")
+        try:
+            bin_name = "parser_mem"
+            build_cmd = ["cargo", "build", "--release", "--bin", bin_name]
+            if self.use_unsafe:
+                build_cmd.extend(["--features", "unsafe-split"])
+            result = subprocess.run(
+                build_cmd,
+                cwd=self.project_path,
+                check=False,
+            )
+            if result.returncode != 0:
+                print("Failed to build binary")
+                return False
+            return True
+        except Exception as e:
+            print(f"Failed to build binary: {e}")
+            return False
+
+    def run_parser(self, parser: str, mode: str, scale: int, bin_name: str) -> 
Dict[str, Any]:
+        binary_path = f"../../target/release/{bin_name}"
+
+        cmd = [binary_path, mode, str(scale), parser]
+
+        try:
+            result = subprocess.run(
+                cmd,
+                cwd=self.project_path,
+                capture_output=True,
+                text=True,
+                timeout=300,  # 5 minute timeout
+            )
+
+            if result.returncode != 0:
+                print(f"Error running {parser}: {result.stderr}")
+                return {}
+
+            return json.loads(result.stdout.strip())
+
+        except subprocess.TimeoutExpired:
+            print(f"Timeout running {parser} {mode} {scale}")
+            return {}
+        except json.JSONDecodeError as e:
+            print(f"Failed to parse JSON from {parser}: {e}")
+            print(f"Raw output: {result.stdout}")
+            return {}
+        except FileNotFoundError:
+            print(f"Binary not found: {binary_path}")
+            print(f"Please run: cargo build --release --bins")
+            return {}
+        except Exception as e:
+            print(f"Exception running {parser}: {e}")
+            return {}
+
+    def run_benchmarks(self) -> Dict[str, Dict[str, Any]]:
+        results = {}
+        successful_count = 0
+        total_count = len(self.parsers)
+
+        print(f"\nRunning benchmarks for {total_count} parsers...")
+
+        for i, parser in enumerate(self.parsers, 1):
+            print(f"\n[{i}/{total_count}] Testing {parser}...")
+            print(f"Running {self.mode} mode with scale {self.scale}...")
+            result = self.run_parser(
+                parser, self.mode, self.scale, "parser_mem")
+
+            if result:
+                result["parser"] = parser
+                results[parser] = result
+                successful_count += 1
+                print(f"Success")
+            else:
+                print(f"Failed")
+
+        print(
+            f"\nCompleted: {successful_count}/{total_count} parsers succeeded")
+
+        if successful_count == total_count:
+            print("All parsers succeeded - generating report...")
+            return results
+        else:
+            print("Some parsers failed - skipping report generation")
+            return {}
+
+    def analyze_results(self, results: Dict[str, Dict[str, Any]]):
+        if not results:
+            print("\nNo results to analyze - all parsers failed or were 
skipped")
+            return
+
+        print(f"\n{'='*80}")
+        print("MEMORY BENCHMARK RESULTS")
+        print(f"{'='*80}")
+        print(f"Mode: {self.mode.upper()}, Scale: {self.scale}")
+        print()
+
+        headers = [
+            "Parser",
+            "ThreadAlloc",
+            "ThreadDealloc",
+            "Allocated",
+            "Active",
+            "Metadata",
+            "Mapped",
+            "Resident",
+            "Retained",
+        ]
+
+        table_data = []
+        for parser in self.parsers:
+            if parser in results:
+                result = results[parser]
+                memory = result.get("memory", {})
+                row = [
+                    parser,
+                    f"{memory.get('thread_allocated_diff', 0):,}",
+                    f"{memory.get('thread_deallocated_diff', 0):,}",
+                    f"{memory.get('allocated', 0):,}",
+                    f"{memory.get('active', 0):,}",
+                    f"{memory.get('metadata', 0):,}",
+                    f"{memory.get('mapped', 0):,}",
+                    f"{memory.get('resident', 0):,}",
+                    f"{memory.get('retained', 0):,}",
+                ]
+                table_data.append(row)
+
+        print("SUMMARY TABLE (All values in bytes)")
+        print(
+            tabulate(
+                table_data,
+                headers=headers,
+                tablefmt="grid",
+                stralign="right",
+                numalign="right",
+            )
+        )
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="Memory benchmark for protobuf parsers"
+    )
+    parser.add_argument(
+        "--unsafe", action="store_true", help="Enable unsafe-split feature"
+    )
+    parser.add_argument(
+        "--mode",
+        choices=["sequential", "concurrent"],
+        default="sequential",
+        help="Test mode to run (default: sequential)",
+    )
+    parser.add_argument(
+        "--scale",
+        type=int,
+        default=10,
+        help="Scale value for benchmark (default: 10)",
+    )
+
+    args = parser.parse_args()
+
+    if args.scale <= 0:
+        print(f"Invalid scale value '{args.scale}', scale must be positive")
+        sys.exit(1)
+
+    data_path = "../remote_write/tests/workloads/1709380533560664458.data"
+    if not os.path.exists(data_path):
+        print(f"Test data file not found at {data_path}")
+        print("Please ensure test data exists before running benchmarks")
+        sys.exit(1)
+
+    benchmark = MemoryBenchmark(
+        scale=args.scale, mode=args.mode, use_unsafe=args.unsafe
+    )
+
+    if not benchmark.build_binary():
+        sys.exit(1)
+
+    print(f"\nRunning memory benchmarks...")
+    print(f"Mode: {args.mode}")
+    print(f"Scale: {args.scale}")
+    print(f"Unsafe optimization: {'enabled' if args.unsafe else 'disabled'}")
+
+    results = benchmark.run_benchmarks()
+    benchmark.analyze_results(results)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/src/benchmarks/src/bin/parser_mem.rs 
b/src/benchmarks/src/bin/parser_mem.rs
new file mode 100644
index 00000000..0028cd57
--- /dev/null
+++ b/src/benchmarks/src/bin/parser_mem.rs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use benchmarks::util::{MemoryBenchConfig, MemoryStats};
+use pb_types::WriteRequest as ProstWriteRequest;
+use prost::Message;
+use protobuf::Message as ProtobufMessage;
+use quick_protobuf::{BytesReader, MessageRead};
+use remote_write::pooled_parser::PooledParser;
+use tikv_jemallocator::Jemalloc;
+
+#[global_allocator]
+static ALLOC: Jemalloc = Jemalloc;
+
+#[tokio::main(flavor = "current_thread")]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let config = MemoryBenchConfig::from_args();
+    let args: Vec<String> = std::env::args().collect();
+    let parser = args.get(3).map(|s| s.as_str()).unwrap_or("pooled");
+
+    let start_stats = MemoryStats::collect()?;
+
+    match config.mode.as_str() {
+        "sequential" => match parser {
+            "pooled" => {
+                let parser = PooledParser;
+                for _ in 0..config.scale {
+                    let _ = 
parser.decode_async(config.test_data.clone()).await?;
+                }
+            }
+            "prost" => {
+                for _ in 0..config.scale {
+                    ProstWriteRequest::decode(config.test_data.clone())?;
+                }
+            }
+            "rust-protobuf" => {
+                for _ in 0..config.scale {
+                    let _ = 
benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(
+                        &config.test_data,
+                    )?;
+                }
+            }
+            "quick-protobuf" => {
+                for _ in 0..config.scale {
+                    let mut reader = 
BytesReader::from_bytes(&config.test_data);
+                    let _ = 
benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(
+                        &mut reader,
+                        &config.test_data,
+                    )?;
+                }
+            }
+            other => panic!("unknown parser: {}", other),
+        },
+        "concurrent" => match parser {
+            "pooled" => {
+                let mut handles = Vec::new();
+                for _ in 0..config.scale {
+                    let data_clone = config.test_data.clone();
+                    let handle = tokio::spawn(async move {
+                        let parser = PooledParser;
+                        let _ = parser.decode_async(data_clone).await;
+                    });
+                    handles.push(handle);
+                }
+                for handle in handles {
+                    handle.await?;
+                }
+            }
+            "prost" => {
+                let mut handles = Vec::new();
+                for _ in 0..config.scale {
+                    let data_clone = config.test_data.clone();
+                    let handle = tokio::spawn(async move {
+                        let _ = ProstWriteRequest::decode(data_clone);
+                    });
+                    handles.push(handle);
+                }
+                for handle in handles {
+                    handle.await?;
+                }
+            }
+            "rust-protobuf" => {
+                let mut handles = Vec::new();
+                for _ in 0..config.scale {
+                    let data_clone = config.test_data.clone();
+                    let handle = tokio::spawn(async move {
+                        let _ =
+                            
benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(
+                                &data_clone,
+                            );
+                    });
+                    handles.push(handle);
+                }
+                for handle in handles {
+                    handle.await?;
+                }
+            }
+            "quick-protobuf" => {
+                let mut handles = Vec::new();
+                for _ in 0..config.scale {
+                    let data_clone = config.test_data.clone();
+                    let handle = tokio::spawn(async move {
+                        let mut reader = BytesReader::from_bytes(&data_clone);
+                        let _ = 
benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(
+                            &mut reader,
+                            &data_clone,
+                        );
+                    });
+                    handles.push(handle);
+                }
+                for handle in handles {
+                    handle.await?;
+                }
+            }
+            other => panic!("unknown parser: {}", other),
+        },
+        _ => panic!("invalid mode"),
+    }
+
+    let end_stats = MemoryStats::collect()?;
+    let memory_diff = start_stats.diff(&end_stats);
+    config.output_json(&memory_diff);
+    Ok(())
+}
diff --git a/src/benchmarks/src/bin/pool_stats.rs 
b/src/benchmarks/src/bin/pool_stats.rs
new file mode 100644
index 00000000..48115075
--- /dev/null
+++ b/src/benchmarks/src/bin/pool_stats.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! evaluate the efficiency of the deadpool-backed object pool.
+
+use std::fs;
+
+use bytes::Bytes;
+use remote_write::{pooled_parser::PooledParser, pooled_types::POOL};
+use tikv_jemallocator::Jemalloc;
+use tokio::task::JoinHandle;
+
+#[global_allocator]
+static ALLOC: Jemalloc = Jemalloc;
+
+async fn run_concurrent_parsing(scale: usize) -> deadpool::Status {
+    let data = 
fs::read("../remote_write/tests/workloads/1709380533560664458.data")
+        .expect("test data load failed");
+    let data = Bytes::from(data);
+
+    let handles: Vec<JoinHandle<()>> = (0..scale)
+        .map(|_| {
+            let data = data.clone();
+            tokio::spawn(async move {
+                let parser = PooledParser;
+                let _ = parser
+                    .decode_async(data.clone())
+                    .await
+                    .expect("parse failed");
+            })
+        })
+        .collect();
+
+    for handle in handles {
+        handle.await.expect("task completion failed");
+    }
+
+    POOL.status()
+}
+
+#[tokio::main]
+async fn main() {
+    let scale_values = [1, 2, 5, 10, 20, 50, 100, 200, 500];
+
+    println!(
+        "{:<8} {:<10} {:<10} {:<10} {:<10}",
+        "Scale", "MaxSize", "PoolSize", "Available", "Waiting"
+    );
+    println!("{}", "=".repeat(50));
+
+    for &scale in &scale_values {
+        let status = run_concurrent_parsing(scale).await;
+
+        println!(
+            "{:<8} {:<10} {:<10} {:<10} {:<10}",
+            scale, status.max_size, status.size, status.available, 
status.waiting
+        );
+
+        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+    }
+
+    println!("=== Final Pool Status ===");
+    let final_status = POOL.status();
+    println!("Max Pool Size: {}", final_status.max_size);
+    println!("Current Pool Size: {}", final_status.size);
+    println!("Available Objects: {}", final_status.available);
+    println!("Waiting Requests: {}", final_status.waiting);
+}
diff --git a/src/benchmarks/src/config.rs b/src/benchmarks/src/config.rs
index 27577f25..199ef627 100644
--- a/src/benchmarks/src/config.rs
+++ b/src/benchmarks/src/config.rs
@@ -27,6 +27,7 @@ const BENCH_CONFIG_PATH_KEY: &str = "BENCH_CONFIG_PATH";
 #[derive(Debug, Deserialize)]
 pub struct BenchConfig {
     pub manifest: ManifestConfig,
+    pub remote_write: RemoteWriteConfig,
 }
 
 pub fn config_from_env() -> BenchConfig {
@@ -48,3 +49,10 @@ pub struct ManifestConfig {
     pub bench_measurement_time: ReadableDuration,
     pub bench_sample_size: usize,
 }
+
+#[derive(Deserialize, Debug, Clone)]
+pub struct RemoteWriteConfig {
+    pub workload_file: String,
+    pub sequential_scales: Vec<usize>,
+    pub concurrent_scales: Vec<usize>,
+}
diff --git a/src/benchmarks/src/lib.rs b/src/benchmarks/src/lib.rs
index 180d1b8a..20d2564f 100644
--- a/src/benchmarks/src/lib.rs
+++ b/src/benchmarks/src/lib.rs
@@ -19,4 +19,17 @@
 
 pub mod config;
 pub mod encoding_bench;
-mod util;
+pub mod remote_write_bench;
+pub mod util;
+
+#[allow(clippy::all)]
+#[allow(warnings)]
+pub mod quick_protobuf_remote_write {
+    include!(concat!(env!("OUT_DIR"), "/quick_protobuf_remote_write.rs"));
+}
+
+#[allow(clippy::all)]
+#[allow(warnings)]
+pub mod rust_protobuf_remote_write {
+    include!(concat!(env!("OUT_DIR"), "/rust_protobuf_remote_write.rs"));
+}
diff --git a/src/benchmarks/src/remote_write_bench.rs 
b/src/benchmarks/src/remote_write_bench.rs
new file mode 100644
index 00000000..b5372da5
--- /dev/null
+++ b/src/benchmarks/src/remote_write_bench.rs
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! remote write parser bench.
+
+use std::{fs, path::PathBuf};
+
+use bytes::Bytes;
+use pb_types::WriteRequest as ProstWriteRequest;
+use prost::Message;
+use protobuf::Message as ProtobufMessage;
+use quick_protobuf::{BytesReader, MessageRead};
+use remote_write::pooled_parser::PooledParser;
+use tokio::task::JoinHandle;
+
+use crate::{
+    config::RemoteWriteConfig,
+    quick_protobuf_remote_write::WriteRequest as QuickProtobufWriteRequest,
+    rust_protobuf_remote_write::WriteRequest as RustProtobufWriteRequest,
+};
+
+pub struct RemoteWriteBench {
+    raw_data: Vec<u8>,
+}
+
+impl RemoteWriteBench {
+    pub fn new(config: RemoteWriteConfig) -> Self {
+        let mut workload_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
+        workload_path.push(&config.workload_file);
+
+        let raw_data = fs::read(&workload_path)
+            .unwrap_or_else(|_| panic!("failed to read workload file: {:?}", 
workload_path));
+
+        Self { raw_data }
+    }
+
+    // prost parser sequential bench.
+    pub fn prost_parser_sequential(&self, scale: usize) -> Result<(), String> {
+        for _ in 0..scale {
+            let data = Bytes::from(self.raw_data.clone());
+            ProstWriteRequest::decode(data)
+                .map_err(|e| format!("prost sequential parse failed: {}", e))?;
+        }
+        Ok(())
+    }
+
+    // Hand-written pooled parser sequential bench.
+    pub async fn pooled_parser_sequential(&self, scale: usize) -> Result<(), 
String> {
+        let parser = PooledParser;
+        for _ in 0..scale {
+            let data = Bytes::from(self.raw_data.clone());
+            let _ = parser
+                .decode_async(data.clone())
+                .await
+                .map_err(|e| format!("pooled sequential parse failed: {:?}", 
e))?;
+        }
+        Ok(())
+    }
+
+    // quick-protobuf parser sequential bench.
+    pub fn quick_protobuf_parser_sequential(&self, scale: usize) -> Result<(), 
String> {
+        for _ in 0..scale {
+            let mut reader = BytesReader::from_bytes(&self.raw_data);
+            QuickProtobufWriteRequest::from_reader(&mut reader, &self.raw_data)
+                .map_err(|e| format!("quick-protobuf sequential parse failed: 
{}", e))?;
+        }
+        Ok(())
+    }
+
+    // rust-protobuf parser sequential bench.
+    pub fn rust_protobuf_parser_sequential(&self, scale: usize) -> Result<(), 
String> {
+        for _ in 0..scale {
+            RustProtobufWriteRequest::parse_from_bytes(&self.raw_data)
+                .map_err(|e| format!("rust-protobuf sequential parse failed: 
{}", e))?;
+        }
+        Ok(())
+    }
+
+    // prost parser concurrent bench.
+    pub async fn prost_parser_concurrent(&self, scale: usize) -> Result<(), 
String> {
+        let join_handles: Vec<JoinHandle<Result<(), String>>> = (0..scale)
+            .map(|_| {
+                let raw_data = self.raw_data.clone();
+                tokio::spawn(async move {
+                    let data = Bytes::from(raw_data);
+                    ProstWriteRequest::decode(data)
+                        .map_err(|e| format!("prost concurrent parse failed: 
{}", e))?;
+                    Ok(())
+                })
+            })
+            .collect();
+
+        for join_handle in join_handles {
+            join_handle.await.unwrap()?;
+        }
+        Ok(())
+    }
+
+    // Hand-written pooled parser concurrent bench.
+    pub async fn pooled_parser_concurrent(&self, scale: usize) -> Result<(), 
String> {
+        let parser = PooledParser;
+        let join_handles: Vec<JoinHandle<Result<(), String>>> = (0..scale)
+            .map(|_| {
+                let parser = parser.clone();
+                let raw_data = self.raw_data.clone();
+                tokio::spawn(async move {
+                    let data = Bytes::from(raw_data);
+                    let _ = parser
+                        .decode_async(data.clone())
+                        .await
+                        .map_err(|e| format!("pooled concurrent parse failed: 
{:?}", e))?;
+                    Ok(())
+                })
+            })
+            .collect();
+
+        for join_handle in join_handles {
+            join_handle.await.unwrap()?;
+        }
+        Ok(())
+    }
+
+    // quick-protobuf parser concurrent bench.
+    pub async fn quick_protobuf_parser_concurrent(&self, scale: usize) -> 
Result<(), String> {
+        let join_handles: Vec<tokio::task::JoinHandle<Result<(), String>>> = 
(0..scale)
+            .map(|_| {
+                let data = self.raw_data.clone();
+                tokio::spawn(async move {
+                    let mut reader = BytesReader::from_bytes(&data);
+                    QuickProtobufWriteRequest::from_reader(&mut reader, &data)
+                        .map_err(|e| format!("quick-protobuf concurrent parse 
failed: {}", e))?;
+                    Ok(())
+                })
+            })
+            .collect();
+
+        for join_handle in join_handles {
+            join_handle.await.unwrap()?;
+        }
+        Ok(())
+    }
+
+    // rust-protobuf parser concurrent bench.
+    pub async fn rust_protobuf_parser_concurrent(&self, scale: usize) -> 
Result<(), String> {
+        let join_handles: Vec<tokio::task::JoinHandle<Result<(), String>>> = 
(0..scale)
+            .map(|_| {
+                let data = self.raw_data.clone();
+                tokio::spawn(async move {
+                    RustProtobufWriteRequest::parse_from_bytes(&data)
+                        .map_err(|e| format!("rust-protobuf concurrent parse 
failed: {}", e))?;
+                    Ok(())
+                })
+            })
+            .collect();
+
+        for join_handle in join_handles {
+            join_handle.await.unwrap()?;
+        }
+        Ok(())
+    }
+}
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index 623b5b06..4d74f785 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -18,15 +18,20 @@
 //! Utilities for benchmarks.
 
 use std::{
+    env,
     fmt::{self, Write},
+    fs,
     str::FromStr,
     time::Duration,
 };
 
+use bytes::Bytes;
 use serde::{
     de::{self, Visitor},
     Deserialize, Deserializer, Serialize, Serializer,
 };
+use serde_json::json;
+use tikv_jemalloc_ctl::{epoch, stats, thread};
 
 #[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Default)]
 pub struct ReadableDuration(pub Duration);
@@ -168,3 +173,123 @@ impl<'de> Deserialize<'de> for ReadableDuration {
         deserializer.deserialize_str(DurVisitor)
     }
 }
+
+// Memory bench utilities.
+#[derive(Debug, Clone)]
+pub struct MemoryStats {
+    pub thread_allocated: u64,
+    pub thread_deallocated: u64,
+    pub allocated: u64,
+    pub active: u64,
+    pub metadata: u64,
+    pub mapped: u64,
+    pub resident: u64,
+    pub retained: u64,
+}
+
+#[derive(Debug, Clone)]
+pub struct MemoryStatsDiff {
+    pub thread_allocated_diff: i64,
+    pub thread_deallocated_diff: i64,
+    pub allocated: i64,
+    pub active: i64,
+    pub metadata: i64,
+    pub mapped: i64,
+    pub resident: i64,
+    pub retained: i64,
+}
+
+impl MemoryStats {
+    pub fn collect() -> Result<Self, String> {
+        epoch::advance().map_err(|e| format!("failed to advance jemalloc 
epoch: {}", e))?;
+
+        Ok(MemoryStats {
+            thread_allocated: thread::allocatedp::read()
+                .map_err(|e| format!("failed to read thread.allocatedp: {}", 
e))?
+                .get(),
+            thread_deallocated: thread::deallocatedp::read()
+                .map_err(|e| format!("failed to read thread.deallocatedp: {}", 
e))?
+                .get(),
+            allocated: stats::allocated::read()
+                .map_err(|e| format!("failed to read allocated: {}", e))?
+                .try_into()
+                .unwrap(),
+            active: stats::active::read()
+                .map_err(|e| format!("failed to read active: {}", e))?
+                .try_into()
+                .unwrap(),
+            metadata: stats::metadata::read()
+                .map_err(|e| format!("failed to read metadata: {}", e))?
+                .try_into()
+                .unwrap(),
+            mapped: stats::mapped::read()
+                .map_err(|e| format!("failed to read mapped: {}", e))?
+                .try_into()
+                .unwrap(),
+            resident: stats::resident::read()
+                .map_err(|e| format!("failed to read resident: {}", e))?
+                .try_into()
+                .unwrap(),
+            retained: stats::retained::read()
+                .map_err(|e| format!("failed to read retained: {}", e))?
+                .try_into()
+                .unwrap(),
+        })
+    }
+
+    pub fn diff(&self, other: &MemoryStats) -> MemoryStatsDiff {
+        MemoryStatsDiff {
+            thread_allocated_diff: other.thread_allocated as i64 - 
self.thread_allocated as i64,
+            thread_deallocated_diff: other.thread_deallocated as i64
+                - self.thread_deallocated as i64,
+            allocated: other.allocated as i64 - self.allocated as i64,
+            active: other.active as i64 - self.active as i64,
+            metadata: other.metadata as i64 - self.metadata as i64,
+            mapped: other.mapped as i64 - self.mapped as i64,
+            resident: other.resident as i64 - self.resident as i64,
+            retained: other.retained as i64 - self.retained as i64,
+        }
+    }
+}
+
+pub struct MemoryBenchConfig {
+    pub test_data: Bytes,
+    pub scale: usize,
+    pub mode: String,
+}
+
+impl MemoryBenchConfig {
+    pub fn from_args() -> Self {
+        let args: Vec<String> = env::args().collect();
+        let mode = args[1].clone();
+        let scale: usize = args[2].parse().expect("invalid scale");
+        let test_data = Bytes::from(
+            
fs::read("../remote_write/tests/workloads/1709380533560664458.data")
+                .expect("test data load failed"),
+        );
+
+        MemoryBenchConfig {
+            test_data,
+            scale,
+            mode,
+        }
+    }
+
+    pub fn output_json(&self, memory_diff: &MemoryStatsDiff) {
+        let result = json!({
+            "mode": self.mode,
+            "scale": self.scale,
+            "memory": {
+                "thread_allocated_diff": memory_diff.thread_allocated_diff,
+                "thread_deallocated_diff": memory_diff.thread_deallocated_diff,
+                "allocated": memory_diff.allocated,
+                "active": memory_diff.active,
+                "metadata": memory_diff.metadata,
+                "mapped": memory_diff.mapped,
+                "resident": memory_diff.resident,
+                "retained": memory_diff.retained
+            }
+        });
+        println!("{}", result);
+    }
+}
diff --git a/src/pb_types/build.rs b/src/pb_types/build.rs
index 7eb68464..1dc9a4ff 100644
--- a/src/pb_types/build.rs
+++ b/src/pb_types/build.rs
@@ -18,6 +18,9 @@
 use std::io::Result;
 
 fn main() -> Result<()> {
-    prost_build::compile_protos(&["protos/sst.proto"], &["protos/"])?;
+    prost_build::compile_protos(
+        &["protos/sst.proto", "protos/remote_write.proto"],
+        &["protos/"],
+    )?;
     Ok(())
 }
diff --git a/src/pb_types/protos/remote_write.proto 
b/src/pb_types/protos/remote_write.proto
new file mode 100644
index 00000000..110ac67b
--- /dev/null
+++ b/src/pb_types/protos/remote_write.proto
@@ -0,0 +1,77 @@
+// Copyright 2016 Prometheus Team
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This file is is modified from
+// https://github.com/prometheus/prometheus/blob/main/prompb/remote.proto
+// and https://github.com/prometheus/prometheus/blob/main/prompb/types.proto
+
+syntax = "proto3";
+package pb_types.remote_write;
+
+message WriteRequest {
+  repeated TimeSeries timeseries = 1;
+  // Cortex uses this field to determine the source of the write request.
+  // We reserve it to avoid any compatibility issues.
+  reserved  2;
+  repeated MetricMetadata metadata = 3;
+}
+
+message MetricMetadata {
+  enum MetricType {
+    UNKNOWN        = 0;
+    COUNTER        = 1;
+    GAUGE          = 2;
+    HISTOGRAM      = 3;
+    GAUGEHISTOGRAM = 4;
+    SUMMARY        = 5;
+    INFO           = 6;
+    STATESET       = 7;
+  }
+
+  // Represents the metric type, these match the set from Prometheus.
+  // Refer to pkg/textparse/interface.go for details.
+  MetricType type = 1;
+  string metric_family_name = 2;
+  string help = 4;
+  string unit = 5;
+}
+
+// TimeSeries represents samples and labels for a single time series.
+message TimeSeries {
+  // For a timeseries to be valid, and for the samples and exemplars
+  // to be ingested by the remote system properly, the labels field is 
required.
+  repeated Label labels   = 1;
+  repeated Sample samples = 2;
+  repeated Exemplar exemplars = 3;
+}
+
+message Label {
+  string name  = 1;
+  string value = 2;
+}
+
+message Sample {
+  double value    = 1;
+  // timestamp is in ms format, see pkg/timestamp/timestamp.go for
+  // conversion from time.Time to Prometheus timestamp.
+  int64 timestamp = 2;
+}
+
+message Exemplar {
+  // Optional, can be empty.
+  repeated Label labels = 1;
+  double value = 2;
+  // timestamp is in ms format, see pkg/timestamp/timestamp.go for
+  // conversion from time.Time to Prometheus timestamp.
+  int64 timestamp = 3;
+}
\ No newline at end of file
diff --git a/src/pb_types/src/lib.rs b/src/pb_types/src/lib.rs
index bfa215b0..727a1d8f 100644
--- a/src/pb_types/src/lib.rs
+++ b/src/pb_types/src/lib.rs
@@ -19,4 +19,9 @@ mod pb_types {
     include!(concat!(env!("OUT_DIR"), "/pb_types.sst.rs"));
 }
 
+mod prost_remote_write {
+    include!(concat!(env!("OUT_DIR"), "/pb_types.remote_write.rs"));
+}
+
 pub use pb_types::*;
+pub use prost_remote_write::*;
diff --git a/src/benchmarks/Cargo.toml b/src/remote_write/Cargo.toml
similarity index 77%
copy from src/benchmarks/Cargo.toml
copy to src/remote_write/Cargo.toml
index a6c017bf..344308c4 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/remote_write/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [package]
-name = "benchmarks"
+name = "remote_write"
 version.workspace = true
 authors.workspace = true
 edition.workspace = true
@@ -25,20 +25,15 @@ repository.workspace = true
 homepage.workspace = true
 description.workspace = true
 
+[features]
+unsafe-split = []
+
 [dependencies]
+anyhow = { workspace = true }
+async-trait = { workspace = true }
 bytes = { workspace = true }
-columnar_storage = { workspace = true }
-common = { workspace = true }
+deadpool = { workspace = true }
+once_cell = { workspace = true }
 pb_types = { workspace = true }
 prost = { workspace = true }
-serde = { workspace = true }
-toml = { workspace = true }
-tracing = { workspace = true }
-tracing-subscriber = { workspace = true }
-
-[dev-dependencies]
-criterion = { workspace = true }
-
-[[bench]]
-name = "bench"
-harness = false
+tokio = { workspace = true }
diff --git a/src/remote_write/README.md b/src/remote_write/README.md
new file mode 100644
index 00000000..f41a00c5
--- /dev/null
+++ b/src/remote_write/README.md
@@ -0,0 +1,291 @@
+# Remote Write Parser
+
+A hand-written [Prometheus Remote Write Request 
(V1)](https://prometheus.io/docs/specs/prw/remote_write_spec/) parser optimized 
for zero-allocation parsing. It receives protobuf data as `Bytes` and returns a 
parsed `WriteRequest` instance.
+
+## Implementation
+
+Key optimization techniques:
+
+- Object pooling backed by deadpool.
+
+- `RepeatedField` data structures.
+
+- Zero-copy bytes split backed by unsafe.
+
+- Manual loop unrolling and function inline optimization.
+
+## Performance
+
+This section presents comprehensive performance analysis of the hand-written 
pooled parser compared to other popular Rust protobuf libraries 
([prost](https://github.com/tokio-rs/prost), 
[rust-protobuf](https://github.com/stepancheg/rust-protobuf), 
[quick-protobuf](https://github.com/tafia/quick-protobuf)) and the 
[easyproto](https://github.com/VictoriaMetrics/easyproto) (Go) implementation. 
All tests were conducted on Ubuntu 22.04.4 LTS x86_64 with AMD EPYC 7742 (8) @ 
2.249GHz CPU and 16GB RAM.
+
+### Prerequisites
+
+Install the required dependencies:
+
+```shell
+cargo install pb-rs
+pip3 install tabulate matplotlib
+```
+
+### CPU Time
+
+#### Steps
+
+Navigate to the benchmarks directory:
+
+```shell
+cd src/benchmarks
+```
+
+Run the standard benchmarks:
+
+```shell
+BENCH_CONFIG_PATH=config.toml cargo bench --bench bench remote_write
+```
+
+Or enable unsafe optimization for better performance:
+
+```shell
+BENCH_CONFIG_PATH=config.toml cargo bench --features unsafe-split --bench 
bench remote_write
+```
+
+We also benchmarked against the 
[easyproto](https://github.com/VictoriaMetrics/easyproto) library for 
comparison:
+
+```shell
+git clone https://github.com/VictoriaMetrics/VictoriaMetrics.git
+git checkout d083ff790a203ecda1cbbd527c792ef19c159f91
+cd VictoriaMetrics/lib/prompb
+vim prom_decode_bench_test.go
+```
+
+and add the following code (please change the path of 
1709380533560664458.data):
+
+```go
+package prompb
+
+import (
+    "fmt"
+    "os"
+    "sync"
+    "testing"
+)
+
+type Decoder interface {
+    Parse(data []byte) error
+    Reset()
+    Clone() Decoder
+}
+
+type PooledDecoder struct {
+    pool *sync.Pool
+}
+
+func NewPooledDecoder() *PooledDecoder {
+    pool := &sync.Pool{
+        New: func() interface{} {
+            return &WriteRequest{}
+        },
+    }
+    return &PooledDecoder{pool: pool}
+}
+
+func (d *PooledDecoder) Parse(data []byte) error {
+    wr := d.pool.Get().(*WriteRequest)
+    defer d.pool.Put(wr)
+    wr.Reset()
+    return wr.UnmarshalProtobuf(data)
+}
+
+func (d *PooledDecoder) Reset() {
+    // Pool handles reset internally.
+}
+
+func (d *PooledDecoder) Clone() Decoder {
+    return d
+}
+
+type NoPoolDecoder struct {
+    wr *WriteRequest
+}
+
+func NewNoPoolDecoder() *NoPoolDecoder {
+    return &NoPoolDecoder{
+        wr: &WriteRequest{},
+    }
+}
+
+func (d *NoPoolDecoder) Parse(data []byte) error {
+    d.wr.Reset()
+    return d.wr.UnmarshalProtobuf(data)
+}
+
+func (d *NoPoolDecoder) Reset() {
+    d.wr.Reset()
+}
+
+func (d *NoPoolDecoder) Clone() Decoder {
+    return NewNoPoolDecoder()
+}
+
+func getTestDataPath() ([]byte, error) {
+    return os.ReadFile("1709380533560664458.data")
+}
+
+// Sequential benchmark.
+func benchDecoderSequential(decoder Decoder, data []byte, n int) error {
+    for i := 0; i < n; i++ {
+        decoder.Reset()
+        if err := decoder.Parse(data); err != nil {
+            return err
+        }
+    }
+    return nil
+}
+
+// Concurrent benchmark.
+func benchDecoderConcurrent(decoder Decoder, data []byte, workers int) error {
+    results := make(chan error, workers)
+
+    // Spawn workers (similar to tokio::spawn in Rust).
+    for w := 0; w < workers; w++ {
+        go func() {
+            clonedDecoder := decoder.Clone()
+            clonedDecoder.Reset()
+            err := clonedDecoder.Parse(data)
+            results <- err
+        }()
+    }
+
+    // Wait for all workers to complete (similar to join_handle.await).
+    for w := 0; w < workers; w++ {
+        if err := <-results; err != nil {
+            return err
+        }
+    }
+    return nil
+}
+
+func BenchmarkSequentialParse(b *testing.B) {
+    data, err := getTestDataPath()
+    if err != nil {
+        b.Skipf("test data file not found: %v", err)
+    }
+
+    decoders := map[string]Decoder{
+        "pooled": NewPooledDecoder(),
+        "nopool": NewNoPoolDecoder(),
+    }
+
+    iterations := []int{1, 5, 10, 20, 100}
+
+    for decoderName, decoder := range decoders {
+        for _, n := range iterations {
+            b.Run(fmt.Sprintf("%s/%d", decoderName, n), func(b *testing.B) {
+                b.ResetTimer()
+                for i := 0; i < b.N; i++ {
+                    if err := benchDecoderSequential(decoder, data, n); err != 
nil {
+                        b.Fatalf("failed to parse: %v", err)
+                    }
+                }
+            })
+        }
+    }
+}
+
+func BenchmarkConcurrentParse(b *testing.B) {
+    data, err := getTestDataPath()
+    if err != nil {
+        b.Skipf("test data file not found: %v", err)
+    }
+
+    decoders := map[string]Decoder{
+        "pooled": NewPooledDecoder(),
+        "nopool": NewNoPoolDecoder(),
+    }
+
+    workers := []int{1, 5, 10, 20, 100}
+
+    for decoderName, decoder := range decoders {
+        for _, w := range workers {
+            b.Run(fmt.Sprintf("%s/%d", decoderName, w), func(b *testing.B) {
+                b.ResetTimer()
+                for i := 0; i < b.N; i++ {
+                    if err := benchDecoderConcurrent(decoder, data, w); err != 
nil {
+                        b.Fatalf("failed to parse: %v", err)
+                    }
+                }
+            })
+        }
+    }
+}
+```
+
+Execute the Go benchmarks:
+
+```shell
+go test -bench=. -run=^$
+```
+
+#### Results
+
+Test results are as follows:
+
+![Sequential 
Performance](../../docs/assets/remote-write-sequential-performance.png)
+
+In sequential parsing scenarios, the hand-written pooled parsers (with and 
without unsafe optimization) achieve the best performance across all scales 
compared to other Rust parsers. The unsafe optimization provides nearly 50% 
performance improvement.
+
+![Concurrent 
Performance](../../docs/assets/remote-write-concurrent-performance.png)
+
+**Note**: Due to the nature of concurrent execution, concurrent parsing 
benchmark results may vary (sometimes dramatically) across different runs. 
However, we can still draw an overall conclusion.
+
+In concurrent parsing scenarios, from an overall perspective, the hand-written 
pooled parsers (with and without unsafe optimization) still achieve the best 
performance compared to other Rust parsers. The unsafe optimization continues 
to provide performance improvements.
+
+### Memory Allocation
+
+Navigate to the benchmarks directory:
+
+```shell
+cd src/benchmarks
+```
+
+Run memory allocation benchmarks:
+
+```shell
+python3 remote_write_memory_bench.py --mode sequential --scale 10
+```
+
+Or enable unsafe optimization:
+
+```shell
+python3 remote_write_memory_bench.py --mode concurrent --scale 10 --unsafe
+```
+
+**Note**: Sequential and concurrent mode results are similar due to the 
enforced `#[tokio::main(flavor = "current_thread")]` configuration. This 
constraint is necessary because Jemalloc's `thread::allocatedp` and 
`thread::deallocatedp` statistics can only track single-threaded allocations 
accurately.
+
+We focus on the 
[allocatedp](https://docs.rs/tikv-jemalloc-ctl/0.6.0/tikv_jemalloc_ctl/thread/struct.allocatedp.html)
 value to verify our zero-allocation parsing efforts, since it represents the 
number of bytes that **have ever been** allocated by the thread.
+
+The results are as follows:
+
+![Memory](../../docs/assets/remote-write-memory-performance.png)
+
+The hand-written pooled parser allocates minimal memory compared to other Rust 
parsers. Note that the difference between `ThreadAlloc` and `ThreadDealloc` in 
the pooled decoder is expected since we gather statistics right before the 
program terminates and objects remain in the pool (not freed) at that time.
+
+### Object Pool Efficiency
+
+Navigate to the benchmarks directory:
+
+```shell
+cd src/benchmarks
+```
+
+Analyze pool utilization:
+
+```shell
+cargo run --bin pool_stats --release
+```
+
+Our testing finds that only 8 objects in the pool are sufficient to handle 500 
concurrent parsing operations.
+
+## Acknowledgements
+
+The two test data files in `src/remote_write/tests/workloads` are taken from 
[prom-write-request-bench](https://github.com/v0y4g3r/prom-write-request-bench/tree/main/assets).
diff --git a/src/benchmarks/src/lib.rs b/src/remote_write/src/lib.rs
similarity index 90%
copy from src/benchmarks/src/lib.rs
copy to src/remote_write/src/lib.rs
index 180d1b8a..7b5b1573 100644
--- a/src/benchmarks/src/lib.rs
+++ b/src/remote_write/src/lib.rs
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! lib for benchmarks.
-
-pub mod config;
-pub mod encoding_bench;
-mod util;
+mod pb_reader;
+pub mod pooled_parser;
+pub mod pooled_types;
+mod repeated_field;
diff --git a/src/remote_write/src/pb_reader.rs 
b/src/remote_write/src/pb_reader.rs
new file mode 100644
index 00000000..9545d27e
--- /dev/null
+++ b/src/remote_write/src/pb_reader.rs
@@ -0,0 +1,565 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use anyhow::{ensure, Result};
+use bytes::{Buf, Bytes};
+
+use crate::pooled_types::{
+    Exemplar, Label, MetricMetadata, MetricType, Sample, TimeSeries, 
WriteRequest,
+};
+
+#[repr(u8)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub(crate) enum WireType {
+    Varint = 0,
+    SixtyFourBit = 1,
+    LengthDelimited = 2,
+}
+
+const FIELD_NUM_TIMESERIES: u32 = 1;
+const FIELD_NUM_METADATA: u32 = 3;
+const FIELD_NUM_LABELS: u32 = 1;
+const FIELD_NUM_SAMPLES: u32 = 2;
+const FIELD_NUM_EXEMPLARS: u32 = 3;
+const FIELD_NUM_LABEL_NAME: u32 = 1;
+const FIELD_NUM_LABEL_VALUE: u32 = 2;
+const FIELD_NUM_SAMPLE_VALUE: u32 = 1;
+const FIELD_NUM_SAMPLE_TIMESTAMP: u32 = 2;
+const FIELD_NUM_EXEMPLAR_LABELS: u32 = 1;
+const FIELD_NUM_EXEMPLAR_VALUE: u32 = 2;
+const FIELD_NUM_EXEMPLAR_TIMESTAMP: u32 = 3;
+const FIELD_NUM_METADATA_TYPE: u32 = 1;
+const FIELD_NUM_METADATA_FAMILY_NAME: u32 = 2;
+const FIELD_NUM_METADATA_HELP: u32 = 4;
+const FIELD_NUM_METADATA_UNIT: u32 = 5;
+
+// Taken from 
https://github.com/v0y4g3r/prom-write-request-bench/blob/step6/optimize-slice/src/bytes.rs
 under Apache License 2.0.
+#[cfg(feature = "unsafe-split")]
+#[inline(always)]
+unsafe fn copy_to_bytes(data: &mut Bytes, len: usize) -> Bytes {
+    if len == data.remaining() {
+        std::mem::replace(data, Bytes::new())
+    } else {
+        let ret = unsafe { split_to_unsafe(data, len) };
+        data.advance(len);
+        ret
+    }
+}
+
+// Taken from 
https://github.com/v0y4g3r/prom-write-request-bench/blob/step6/optimize-slice/src/bytes.rs
 under Apache License 2.0.
+#[cfg(feature = "unsafe-split")]
+#[inline(always)]
+pub unsafe fn split_to_unsafe(buf: &Bytes, end: usize) -> Bytes {
+    let len = buf.len();
+    assert!(
+        end <= len,
+        "range end out of bounds: {:?} <= {:?}",
+        end,
+        len,
+    );
+
+    if end == 0 {
+        return Bytes::new();
+    }
+
+    let ptr = buf.as_ptr();
+    // `Bytes::drop` does nothing when it's built via `from_static`.
+    use std::slice;
+    Bytes::from_static(unsafe { slice::from_raw_parts(ptr, end) })
+}
+
+pub struct ProtobufReader {
+    data: Bytes,
+}
+
+impl ProtobufReader {
+    pub fn new(data: Bytes) -> Self {
+        Self { data }
+    }
+
+    pub fn remaining(&self) -> usize {
+        self.data.remaining()
+    }
+
+    /// Read a varint from the buffer.
+    ///
+    /// Similar to [quick-protobuf](https://github.com/tafia/quick-protobuf), 
unroll the loop in
+    /// [the official Go 
implementation](https://cs.opensource.google/go/go/+/refs/tags/go1.24.5:src/encoding/binary/varint.go;l=68)
+    /// for better performance.
+    #[inline(always)]
+    pub fn read_varint(&mut self) -> Result<u64> {
+        ensure!(self.data.has_remaining(), "not enough bytes for varint");
+        // First byte.
+        let b = self.data.get_u8();
+        if b < 0x80 {
+            return Ok(b as u64);
+        }
+        let mut x = (b & 0x7f) as u64;
+        // Second byte.
+        ensure!(self.data.has_remaining(), "not enough bytes for varint");
+        let b = self.data.get_u8();
+        if b < 0x80 {
+            return Ok(x | ((b as u64) << 7));
+        }
+        x |= ((b & 0x7f) as u64) << 7;
+        // Third byte.
+        ensure!(self.data.has_remaining(), "not enough bytes for varint");
+        let b = self.data.get_u8();
+        if b < 0x80 {
+            return Ok(x | ((b as u64) << 14));
+        }
+        x |= ((b & 0x7f) as u64) << 14;
+        // Fourth byte.
+        ensure!(self.data.has_remaining(), "not enough bytes for varint");
+        let b = self.data.get_u8();
+        if b < 0x80 {
+            return Ok(x | ((b as u64) << 21));
+        }
+        x |= ((b & 0x7f) as u64) << 21;
+        // Fifth byte.
+        ensure!(self.data.has_remaining(), "not enough bytes for varint");
+        let b = self.data.get_u8();
+        if b < 0x80 {
+            return Ok(x | ((b as u64) << 28));
+        }
+        x |= ((b & 0x7f) as u64) << 28;
+        // Sixth byte.
+        ensure!(self.data.has_remaining(), "not enough bytes for varint");
+        let b = self.data.get_u8();
+        if b < 0x80 {
+            return Ok(x | ((b as u64) << 35));
+        }
+        x |= ((b & 0x7f) as u64) << 35;
+        // Seventh byte.
+        ensure!(self.data.has_remaining(), "not enough bytes for varint");
+        let b = self.data.get_u8();
+        if b < 0x80 {
+            return Ok(x | ((b as u64) << 42));
+        }
+        x |= ((b & 0x7f) as u64) << 42;
+        // Eighth byte.
+        ensure!(self.data.has_remaining(), "not enough bytes for varint");
+        let b = self.data.get_u8();
+        if b < 0x80 {
+            return Ok(x | ((b as u64) << 49));
+        }
+        x |= ((b & 0x7f) as u64) << 49;
+        // Ninth byte.
+        ensure!(self.data.has_remaining(), "not enough bytes for varint");
+        let b = self.data.get_u8();
+        if b < 0x80 {
+            return Ok(x | ((b as u64) << 56));
+        }
+        x |= ((b & 0x7f) as u64) << 56;
+        // Tenth byte (final byte, must terminate).
+        ensure!(self.data.has_remaining(), "not enough bytes for varint");
+        let b = self.data.get_u8();
+        ensure!(b < 0x80, "varint overflow");
+        ensure!(b <= 1, "varint overflow");
+        Ok(x | ((b as u64) << 63))
+    }
+
+    /// Read a double from the buffer.
+    #[inline(always)]
+    pub fn read_double(&mut self) -> Result<f64> {
+        ensure!(self.data.remaining() >= 8, "not enough bytes for double");
+        // In Protobuf, double is encoded as 64-bit.
+        let bits = self.data.get_u64_le();
+        Ok(f64::from_bits(bits))
+    }
+
+    /// Read a 64-bit integer from the buffer.
+    #[inline(always)]
+    pub fn read_int64(&mut self) -> Result<i64> {
+        // In Protobuf, int64 is encoded as varint.
+        self.read_varint().map(|v| v as i64)
+    }
+
+    /// Read a string from the buffer.
+    pub fn read_string(&mut self) -> Result<Bytes> {
+        let len = self.read_varint()? as usize;
+        ensure!(self.data.remaining() >= len, "not enough bytes for string");
+        // In Protobuf, string is encoded as length-delimited UTF-8 bytes.
+        #[cfg(feature = "unsafe-split")]
+        let bytes = unsafe { copy_to_bytes(&mut self.data, len) };
+        #[cfg(not(feature = "unsafe-split"))]
+        let bytes = self.data.split_to(len);
+        // Leave the responsibility of validating UTF-8 to the caller,
+        // which is the practice of both 
[easyproto](https://github.com/VictoriaMetrics/easyproto)
+        // and 
[prom-write-request-bench](https://github.com/v0y4g3r/prom-write-request-bench).
+        Ok(bytes)
+    }
+
+    /// Read a tag from the buffer.
+    #[inline(always)]
+    pub fn read_tag(&mut self) -> Result<(u32, WireType)> {
+        // In Protobuf, tag is encoded as varint.
+        // tag = (field_number << 3) | wire_type.
+        let tag = self.read_varint()?;
+        let field_number = tag >> 3;
+        let wt_val = (tag & 0x07) as u8;
+        ensure!(wt_val <= 2, "unsupported wire type: {}", wt_val);
+        let wire_type = match wt_val {
+            0 => WireType::Varint,
+            1 => WireType::SixtyFourBit,
+            2 => WireType::LengthDelimited,
+            _ => unreachable!(),
+        };
+        Ok((field_number as u32, wire_type))
+    }
+
+    /// Read timeseries from the buffer.
+    #[inline(always)]
+    pub fn read_timeseries(&mut self, timeseries: &mut TimeSeries) -> 
Result<()> {
+        let len = self.read_varint()? as usize;
+        ensure!(
+            self.data.remaining() >= len,
+            "not enough bytes for timeseries"
+        );
+        let start_remaining = self.data.remaining();
+        let end_remaining = start_remaining - len;
+        while self.data.remaining() > end_remaining {
+            let (field_number, wire_type) = self.read_tag()?;
+            match field_number {
+                FIELD_NUM_LABELS => {
+                    validate_wire_type(wire_type, WireType::LengthDelimited, 
"labels")?;
+                    let label_ref = timeseries.labels.push_default();
+                    self.read_label(label_ref)?;
+                }
+                FIELD_NUM_SAMPLES => {
+                    validate_wire_type(wire_type, WireType::LengthDelimited, 
"samples")?;
+                    let sample_ref = timeseries.samples.push_default();
+                    self.read_sample(sample_ref)?;
+                }
+                FIELD_NUM_EXEMPLARS => {
+                    validate_wire_type(wire_type, WireType::LengthDelimited, 
"exemplars")?;
+                    let exemplar_ref = timeseries.exemplars.push_default();
+                    self.read_exemplar(exemplar_ref)?;
+                }
+                _ => {
+                    // Skip unknown fields instead of returning an error
+                    self.skip_field(wire_type)?;
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Read label from the buffer.
+    #[inline(always)]
+    pub fn read_label(&mut self, label: &mut Label) -> Result<()> {
+        let len = self.read_varint()? as usize;
+        ensure!(self.data.remaining() >= len, "not enough bytes for label");
+        let start_remaining = self.data.remaining();
+        let end_remaining = start_remaining - len;
+        while self.data.remaining() > end_remaining {
+            let (field_number, wire_type) = self.read_tag()?;
+            match field_number {
+                FIELD_NUM_LABEL_NAME => {
+                    validate_wire_type(wire_type, WireType::LengthDelimited, 
"label name")?;
+                    label.name = self.read_string()?;
+                }
+                FIELD_NUM_LABEL_VALUE => {
+                    validate_wire_type(wire_type, WireType::LengthDelimited, 
"label value")?;
+                    label.value = self.read_string()?;
+                }
+                _ => {
+                    self.skip_field(wire_type)?;
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Read sample from the buffer.
+    #[inline(always)]
+    pub fn read_sample(&mut self, sample: &mut Sample) -> Result<()> {
+        let len = self.read_varint()? as usize;
+        ensure!(self.data.remaining() >= len, "not enough bytes for sample");
+        let start_remaining = self.data.remaining();
+        let end_remaining = start_remaining - len;
+        while self.data.remaining() > end_remaining {
+            let (field_number, wire_type) = self.read_tag()?;
+            match field_number {
+                FIELD_NUM_SAMPLE_VALUE => {
+                    validate_wire_type(wire_type, WireType::SixtyFourBit, 
"sample value")?;
+                    sample.value = self.read_double()?;
+                }
+                FIELD_NUM_SAMPLE_TIMESTAMP => {
+                    validate_wire_type(wire_type, WireType::Varint, "sample 
timestamp")?;
+                    sample.timestamp = self.read_int64()?;
+                }
+                _ => {
+                    self.skip_field(wire_type)?;
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Read exemplar from the buffer.
+    #[inline(always)]
+    pub fn read_exemplar(&mut self, exemplar: &mut Exemplar) -> Result<()> {
+        let len = self.read_varint()? as usize;
+        ensure!(
+            self.data.remaining() >= len,
+            "not enough bytes for exemplar"
+        );
+        let start_remaining = self.data.remaining();
+        let end_remaining = start_remaining - len;
+        while self.data.remaining() > end_remaining {
+            let (field_number, wire_type) = self.read_tag()?;
+            match field_number {
+                FIELD_NUM_EXEMPLAR_LABELS => {
+                    validate_wire_type(wire_type, WireType::LengthDelimited, 
"exemplar labels")?;
+                    let label_ref = exemplar.labels.push_default();
+                    self.read_label(label_ref)?;
+                }
+                FIELD_NUM_EXEMPLAR_VALUE => {
+                    validate_wire_type(wire_type, WireType::SixtyFourBit, 
"exemplar value")?;
+                    exemplar.value = self.read_double()?;
+                }
+                FIELD_NUM_EXEMPLAR_TIMESTAMP => {
+                    validate_wire_type(wire_type, WireType::Varint, "exemplar 
timestamp")?;
+                    exemplar.timestamp = self.read_int64()?;
+                }
+                _ => {
+                    self.skip_field(wire_type)?;
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Read metric metadata from the buffer.
+    #[inline(always)]
+    pub fn read_metric_metadata(&mut self, metadata: &mut MetricMetadata) -> 
Result<()> {
+        let len = self.read_varint()? as usize;
+        ensure!(
+            self.data.remaining() >= len,
+            "not enough bytes for metadata"
+        );
+        let start_remaining = self.data.remaining();
+        let end_remaining = start_remaining - len;
+        while self.data.remaining() > end_remaining {
+            let (field_number, wire_type) = self.read_tag()?;
+            match field_number {
+                FIELD_NUM_METADATA_TYPE => {
+                    validate_wire_type(wire_type, WireType::Varint, "metadata 
type")?;
+                    let type_value = self.read_varint()? as i32;
+                    metadata.metric_type = match type_value {
+                        0 => MetricType::Unknown,
+                        1 => MetricType::Counter,
+                        2 => MetricType::Gauge,
+                        3 => MetricType::Histogram,
+                        4 => MetricType::GaugeHistogram,
+                        5 => MetricType::Summary,
+                        6 => MetricType::Info,
+                        7 => MetricType::StateSet,
+                        _ => MetricType::Unknown,
+                    };
+                }
+                FIELD_NUM_METADATA_FAMILY_NAME => {
+                    validate_wire_type(
+                        wire_type,
+                        WireType::LengthDelimited,
+                        "metadata family name",
+                    )?;
+                    metadata.metric_family_name = self.read_string()?;
+                }
+                FIELD_NUM_METADATA_HELP => {
+                    validate_wire_type(wire_type, WireType::LengthDelimited, 
"metadata help")?;
+                    metadata.help = self.read_string()?;
+                }
+                FIELD_NUM_METADATA_UNIT => {
+                    validate_wire_type(wire_type, WireType::LengthDelimited, 
"metadata unit")?;
+                    metadata.unit = self.read_string()?;
+                }
+                _ => {
+                    self.skip_field(wire_type)?;
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Skip an unknown field based on its wire type.
+    #[inline(always)]
+    pub fn skip_field(&mut self, wire_type: WireType) -> Result<()> {
+        match wire_type {
+            WireType::Varint => {
+                // For varint, read and discard the value.
+                self.read_varint()?;
+                Ok(())
+            }
+            WireType::SixtyFourBit => {
+                // For 64-bit, skip 8 bytes.
+                ensure!(
+                    self.data.remaining() >= 8,
+                    "not enough bytes to skip 64-bit field"
+                );
+                self.data.advance(8);
+                Ok(())
+            }
+            WireType::LengthDelimited => {
+                // For length-delimited, read length then skip that many bytes.
+                let len = self.read_varint()? as usize;
+                ensure!(
+                    self.data.remaining() >= len,
+                    "not enough bytes to skip length-delimited field"
+                );
+                self.data.advance(len);
+                Ok(())
+            }
+        }
+    }
+}
+
+#[inline(always)]
+fn validate_wire_type(actual: WireType, expected: WireType, field_name: &str) 
-> Result<()> {
+    ensure!(
+        actual == expected,
+        "expected wire type {:?} for {}, but found wire type {:?}",
+        expected,
+        field_name,
+        actual
+    );
+    Ok(())
+}
+
+/// Fill a [`WriteRequest`] instance with data from the buffer.
+pub fn read_write_request(data: Bytes, request: &mut WriteRequest) -> 
Result<()> {
+    let mut reader = ProtobufReader::new(data);
+    while reader.remaining() > 0 {
+        let (field_number, wire_type) = reader.read_tag()?;
+        match field_number {
+            FIELD_NUM_TIMESERIES => {
+                validate_wire_type(wire_type, WireType::LengthDelimited, 
"timeseries")?;
+                let timeseries_ref = request.timeseries.push_default();
+                reader.read_timeseries(timeseries_ref)?;
+            }
+            FIELD_NUM_METADATA => {
+                validate_wire_type(wire_type, WireType::LengthDelimited, 
"metadata")?;
+                let metadata_ref = request.metadata.push_default();
+                reader.read_metric_metadata(metadata_ref)?;
+            }
+            _ => {
+                // Skip unknown fields instead of returning an error
+                reader.skip_field(wire_type)?;
+            }
+        }
+    }
+    Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_read_varint_single_byte() {
+        let data = &[0x42];
+        let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data));
+        assert_eq!(reader.read_varint().unwrap(), 66);
+    }
+
+    #[test]
+    fn test_read_varint_multi_byte() {
+        let data = &[0x96, 0x01];
+        let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data));
+        assert_eq!(reader.read_varint().unwrap(), 150);
+    }
+
+    #[test]
+    fn test_read_double() {
+        let data = &[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xF0, 0x3F];
+        let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data));
+        assert_eq!(reader.read_double().unwrap(), 1.0);
+    }
+
+    #[test]
+    fn test_read_string() {
+        let data = &[0x05, b'h', b'e', b'l', b'l', b'o'];
+        let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data));
+        assert_eq!(reader.read_string().unwrap(), "hello");
+    }
+
+    #[test]
+    fn test_parse_write_request() {
+        use pb_types::{
+            Exemplar, Label, MetricMetadata, Sample, TimeSeries, WriteRequest 
as PbWriteRequest,
+        };
+        use prost::Message;
+
+        let write_request = PbWriteRequest {
+            timeseries: vec![TimeSeries {
+                labels: vec![Label {
+                    name: "metric_name".to_string(),
+                    value: "test_value".to_string(),
+                }],
+                samples: vec![Sample {
+                    value: 42.5,
+                    timestamp: 1234567890,
+                }],
+                exemplars: vec![Exemplar {
+                    labels: vec![Label {
+                        name: "trace_id".to_string(),
+                        value: "abc123".to_string(),
+                    }],
+                    value: 50.0,
+                    timestamp: 1234567891,
+                }],
+            }],
+            metadata: vec![MetricMetadata {
+                r#type: 1,
+                metric_family_name: "test_metric".to_string(),
+                help: "Test metric description".to_string(),
+                unit: "bytes".to_string(),
+            }],
+        };
+
+        let encoded = write_request.encode_to_vec();
+        let data = Bytes::from(encoded);
+        let mut pooled_request = crate::pooled_types::WriteRequest::default();
+        read_write_request(data, &mut pooled_request).unwrap();
+
+        assert_eq!(pooled_request.timeseries.len(), 1);
+        let ts = &pooled_request.timeseries[0];
+        assert_eq!(ts.labels.len(), 1);
+        let label = &ts.labels[0];
+        assert_eq!(label.name, "metric_name");
+        assert_eq!(label.value, "test_value");
+        assert_eq!(ts.samples.len(), 1);
+        let sample = &ts.samples[0];
+        assert_eq!(sample.value, 42.5);
+        assert_eq!(sample.timestamp, 1234567890);
+        assert_eq!(ts.exemplars.len(), 1);
+        let exemplar = &ts.exemplars[0];
+        assert_eq!(exemplar.value, 50.0);
+        assert_eq!(exemplar.timestamp, 1234567891);
+        assert_eq!(exemplar.labels.len(), 1);
+        let exemplar_label = &exemplar.labels[0];
+        assert_eq!(exemplar_label.name, "trace_id");
+        assert_eq!(exemplar_label.value, "abc123");
+        assert_eq!(pooled_request.metadata.len(), 1);
+        let metadata = &pooled_request.metadata[0];
+        assert_eq!(metadata.metric_type, MetricType::Counter);
+        assert_eq!(metadata.metric_family_name, "test_metric");
+        assert_eq!(metadata.help, "Test metric description");
+        assert_eq!(metadata.unit, "bytes");
+    }
+}
diff --git a/src/remote_write/src/pooled_parser.rs 
b/src/remote_write/src/pooled_parser.rs
new file mode 100644
index 00000000..8b2c62fb
--- /dev/null
+++ b/src/remote_write/src/pooled_parser.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pooled parser for Prometheus remote write requests.
+//!
+//! This crate parses the protobuf `string` type as Rust `Bytes` instances
+//! instead of `String` instances to avoid allocation, and it **does not**
+//! perform UTF-8 validation when parsing. Therefore, it is up to the caller to
+//! decide how to make use of the parsed `Bytes` and whether to apply UTF-8
+//! validation.
+
+use anyhow::Result;
+use bytes::Bytes;
+
+use crate::{
+    pb_reader::read_write_request,
+    pooled_types::{WriteRequest, WriteRequestManager, POOL},
+    repeated_field::Clear,
+};
+
+#[derive(Debug, Clone)]
+pub struct PooledParser;
+
+impl PooledParser {
+    fn new() -> Self {
+        Self
+    }
+
+    /// Decode a [`WriteRequest`] from the buffer and return it.
+    pub fn decode(&self, buf: Bytes) -> Result<WriteRequest> {
+        // Cannot get a WriteRequest instance from the pool in sync functions.
+        let mut request = WriteRequest::default();
+        read_write_request(buf, &mut request)?;
+        Ok(request)
+    }
+
+    /// Decode a [`WriteRequest`] from the buffer and return a pooled object.
+    ///
+    /// This method will reuse a [`WriteRequest`] instance from the object
+    /// pool. After the returned object is dropped, it will be returned to the
+    pub async fn decode_async(
+        &self,
+        buf: Bytes,
+    ) -> Result<deadpool::managed::Object<WriteRequestManager>> {
+        let mut pooled_request = POOL
+            .get()
+            .await
+            .map_err(|e| anyhow::anyhow!("failed to get object from pool: 
{e:?}"))?;
+        pooled_request.clear();
+        read_write_request(buf, &mut pooled_request)?;
+        Ok(pooled_request)
+    }
+}
+
+impl Default for PooledParser {
+    fn default() -> Self {
+        Self::new()
+    }
+}
diff --git a/src/remote_write/src/pooled_types.rs 
b/src/remote_write/src/pooled_types.rs
new file mode 100644
index 00000000..427eb607
--- /dev/null
+++ b/src/remote_write/src/pooled_types.rs
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use deadpool::managed::{Manager, Metrics, Pool, RecycleResult};
+use once_cell::sync::Lazy;
+
+use crate::repeated_field::{Clear, RepeatedField};
+
+#[derive(Debug, Clone)]
+pub struct Label {
+    pub name: Bytes,
+    pub value: Bytes,
+}
+
+impl Default for Label {
+    fn default() -> Self {
+        Self {
+            name: Bytes::new(),
+            value: Bytes::new(),
+        }
+    }
+}
+
+impl Clear for Label {
+    fn clear(&mut self) {
+        self.name.clear();
+        self.value.clear();
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct Sample {
+    pub value: f64,
+    pub timestamp: i64,
+}
+
+impl Default for Sample {
+    fn default() -> Self {
+        Self {
+            value: 0.0,
+            timestamp: 0,
+        }
+    }
+}
+
+impl Clear for Sample {
+    fn clear(&mut self) {
+        self.value = 0.0;
+        self.timestamp = 0;
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct Exemplar {
+    pub labels: RepeatedField<Label>,
+    pub value: f64,
+    pub timestamp: i64,
+}
+
+impl Default for Exemplar {
+    fn default() -> Self {
+        Self {
+            labels: RepeatedField::default(),
+            value: 0.0,
+            timestamp: 0,
+        }
+    }
+}
+
+impl Clear for Exemplar {
+    fn clear(&mut self) {
+        self.labels.clear();
+        self.value = 0.0;
+        self.timestamp = 0;
+    }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Default)]
+pub enum MetricType {
+    #[default]
+    Unknown = 0,
+    Counter = 1,
+    Gauge = 2,
+    Histogram = 3,
+    GaugeHistogram = 4,
+    Summary = 5,
+    Info = 6,
+    StateSet = 7,
+}
+
+#[derive(Debug, Clone)]
+pub struct MetricMetadata {
+    pub metric_type: MetricType,
+    pub metric_family_name: Bytes,
+    pub help: Bytes,
+    pub unit: Bytes,
+}
+
+impl Default for MetricMetadata {
+    fn default() -> Self {
+        Self {
+            metric_type: MetricType::Unknown,
+            metric_family_name: Bytes::new(),
+            help: Bytes::new(),
+            unit: Bytes::new(),
+        }
+    }
+}
+
+impl Clear for MetricMetadata {
+    fn clear(&mut self) {
+        self.metric_type = MetricType::Unknown;
+        self.metric_family_name.clear();
+        self.help.clear();
+        self.unit.clear();
+    }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct TimeSeries {
+    pub labels: RepeatedField<Label>,
+    pub samples: RepeatedField<Sample>,
+    pub exemplars: RepeatedField<Exemplar>,
+}
+
+impl Clear for TimeSeries {
+    fn clear(&mut self) {
+        self.labels.clear();
+        self.samples.clear();
+        self.exemplars.clear();
+    }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct WriteRequest {
+    pub timeseries: RepeatedField<TimeSeries>,
+    pub metadata: RepeatedField<MetricMetadata>,
+}
+
+impl Clear for WriteRequest {
+    fn clear(&mut self) {
+        self.timeseries.clear();
+        self.metadata.clear();
+    }
+}
+
+/// A deadpool manager for PooledWriteRequest.
+pub struct WriteRequestManager;
+
+#[async_trait]
+impl Manager for WriteRequestManager {
+    type Error = ();
+    type Type = WriteRequest;
+
+    async fn create(&self) -> Result<Self::Type, Self::Error> {
+        Ok(WriteRequest::default())
+    }
+
+    async fn recycle(
+        &self,
+        _obj: &mut Self::Type,
+        _metrics: &Metrics,
+    ) -> RecycleResult<Self::Error> {
+        // We will reset the object after acquiring it.
+        Ok(())
+    }
+}
+
+const POOL_SIZE: usize = 64; // Maximum number of objects in the pool.
+
+pub static POOL: Lazy<Pool<WriteRequestManager>> = Lazy::new(|| {
+    Pool::builder(WriteRequestManager)
+        .max_size(POOL_SIZE)
+        .build()
+        .unwrap()
+});
diff --git a/src/remote_write/src/repeated_field.rs 
b/src/remote_write/src/repeated_field.rs
new file mode 100644
index 00000000..37a4f9d3
--- /dev/null
+++ b/src/remote_write/src/repeated_field.rs
@@ -0,0 +1,534 @@
+// Copyright (c) 2019 Stepan Koltsov
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to 
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 
USE
+// OR OTHER DEALINGS IN THE SOFTWARE.
+
+/// ! The [Clear] trait and [RepeatedField] are taken from 
[rust-protobuf](https://github.com/stepancheg/rust-protobuf/tree/master/protobuf-examples/vs-prost)
+/// to leverage the pooling mechanism to avoid frequent heap
+/// allocation/deallocation when decoding deeply nested structs.
+use std::borrow::Borrow;
+use std::{
+    cmp::Ordering,
+    default::Default,
+    fmt,
+    hash::{Hash, Hasher},
+    iter::{FromIterator, IntoIterator},
+    ops::{Deref, DerefMut, Index, IndexMut},
+    slice, vec,
+};
+
+use bytes::Bytes;
+
+/// Anything that can be cleared.
+///
+/// Clear should reset the value to a **logically** empty state equivalent to a
+/// newly created object. It does not necessarily deallocate or zero underlying
+/// memory, which enables efficient memory reuse.
+pub trait Clear {
+    /// Clear this make, make it equivalent to newly created object.
+    fn clear(&mut self);
+}
+
+impl<T> Clear for Option<T> {
+    fn clear(&mut self) {
+        self.take();
+    }
+}
+
+impl Clear for String {
+    fn clear(&mut self) {
+        String::clear(self);
+    }
+}
+
+impl<T> Clear for Vec<T> {
+    fn clear(&mut self) {
+        Vec::clear(self);
+    }
+}
+
+impl Clear for Bytes {
+    fn clear(&mut self) {
+        Bytes::clear(self);
+    }
+}
+
+/// Wrapper around vector to avoid deallocations on clear.
+pub struct RepeatedField<T> {
+    vec: Vec<T>,
+    len: usize,
+}
+
+impl<T> RepeatedField<T> {
+    /// Return number of elements in this container.
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    /// Clear.
+    #[inline]
+    pub fn clear(&mut self) {
+        self.len = 0;
+    }
+}
+
+impl<T> Default for RepeatedField<T> {
+    #[inline]
+    fn default() -> RepeatedField<T> {
+        RepeatedField {
+            vec: Vec::new(),
+            len: 0,
+        }
+    }
+}
+
+impl<T> RepeatedField<T> {
+    /// Create new empty container.
+    #[inline]
+    pub fn new() -> RepeatedField<T> {
+        Default::default()
+    }
+
+    /// Create a contained with data from given vec.
+    #[inline]
+    pub fn from_vec(vec: Vec<T>) -> RepeatedField<T> {
+        let len = vec.len();
+        RepeatedField { vec, len }
+    }
+
+    /// Convert data into vec.
+    #[inline]
+    pub fn into_vec(self) -> Vec<T> {
+        let mut vec = self.vec;
+        vec.truncate(self.len);
+        vec
+    }
+
+    /// Return current capacity.
+    #[inline]
+    pub fn capacity(&self) -> usize {
+        self.vec.capacity()
+    }
+
+    /// View data as slice.
+    #[inline]
+    pub fn as_slice(&self) -> &[T] {
+        &self.vec[..self.len]
+    }
+
+    /// View data as mutable slice.
+    #[inline]
+    pub fn as_mut_slice(&mut self) -> &mut [T] {
+        &mut self.vec[..self.len]
+    }
+
+    /// Get subslice of this container.
+    #[inline]
+    pub fn slice(&self, start: usize, end: usize) -> &[T] {
+        &self.as_ref()[start..end]
+    }
+
+    /// Get mutable subslice of this container.
+    #[inline]
+    pub fn slice_mut(&mut self, start: usize, end: usize) -> &mut [T] {
+        &mut self.as_mut_slice()[start..end]
+    }
+
+    /// Get slice from given index.
+    #[inline]
+    pub fn slice_from(&self, start: usize) -> &[T] {
+        &self.as_ref()[start..]
+    }
+
+    /// Get mutable slice from given index.
+    #[inline]
+    pub fn slice_from_mut(&mut self, start: usize) -> &mut [T] {
+        &mut self.as_mut_slice()[start..]
+    }
+
+    /// Get slice to given index.
+    #[inline]
+    pub fn slice_to(&self, end: usize) -> &[T] {
+        &self.as_ref()[..end]
+    }
+
+    /// Get mutable slice to given index.
+    #[inline]
+    pub fn slice_to_mut(&mut self, end: usize) -> &mut [T] {
+        &mut self.as_mut_slice()[..end]
+    }
+
+    /// View this container as two slices split at given index.
+    #[inline]
+    pub fn split_at(&self, mid: usize) -> (&[T], &[T]) {
+        self.as_ref().split_at(mid)
+    }
+
+    /// View this container as two mutable slices split at given index.
+    #[inline]
+    pub fn split_at_mut(&mut self, mid: usize) -> (&mut [T], &mut [T]) {
+        self.as_mut_slice().split_at_mut(mid)
+    }
+
+    /// View all but first elements of this container.
+    #[inline]
+    pub fn tail(&self) -> &[T] {
+        &self.as_ref()[1..]
+    }
+
+    /// Last element of this container.
+    #[inline]
+    pub fn last(&self) -> Option<&T> {
+        self.as_ref().last()
+    }
+
+    /// Mutable last element of this container.
+    #[inline]
+    pub fn last_mut(&mut self) -> Option<&mut T> {
+        self.as_mut_slice().last_mut()
+    }
+
+    /// View all but last elements of this container.
+    #[inline]
+    pub fn init(&self) -> &[T] {
+        let s = self.as_ref();
+        &s[0..s.len() - 1]
+    }
+
+    /// Push an element to the end.
+    #[inline]
+    pub fn push(&mut self, value: T) {
+        if self.len == self.vec.len() {
+            self.vec.push(value);
+        } else {
+            self.vec[self.len] = value;
+        }
+        self.len += 1;
+    }
+
+    /// Pop last element.
+    #[inline]
+    pub fn pop(&mut self) -> Option<T> {
+        if self.len == 0 {
+            None
+        } else {
+            self.vec.truncate(self.len);
+            self.len -= 1;
+            self.vec.pop()
+        }
+    }
+
+    /// Insert an element at specified position.
+    #[inline]
+    pub fn insert(&mut self, index: usize, value: T) {
+        assert!(index <= self.len);
+        self.vec.insert(index, value);
+        self.len += 1;
+    }
+
+    /// Remove an element from specified position.
+    #[inline]
+    pub fn remove(&mut self, index: usize) -> T {
+        assert!(index < self.len);
+        self.len -= 1;
+        self.vec.remove(index)
+    }
+
+    /// Retains only the elements specified by the predicate.
+    ///
+    /// In other words, remove all elements `e` such that `f(&e)` returns
+    /// `false`. This method operates in place, visiting each element
+    /// exactly once in the original order, and preserves the order of the
+    /// retained elements.
+    ///
+    /// # Examples
+    ///
+    /// ```ignore
+    /// let mut vec = RepeatedField::from(vec![1, 2, 3, 4]);
+    /// vec.retain(|&x| x % 2 == 0);
+    /// assert_eq!(vec, RepeatedField::from(vec![2, 4]));
+    /// ```
+    pub fn retain<F>(&mut self, f: F)
+    where
+        F: FnMut(&T) -> bool,
+    {
+        // suboptimal
+        self.vec.truncate(self.len);
+        self.vec.retain(f);
+        self.len = self.vec.len();
+    }
+
+    /// Truncate at specified length.
+    #[inline]
+    pub fn truncate(&mut self, len: usize) {
+        if self.len > len {
+            self.len = len;
+        }
+    }
+
+    /// Reverse in place.
+    #[inline]
+    pub fn reverse(&mut self) {
+        self.as_mut_slice().reverse()
+    }
+
+    /// Into owned iterator.
+    #[inline]
+    pub fn into_iter(mut self) -> vec::IntoIter<T> {
+        self.vec.truncate(self.len);
+        self.vec.into_iter()
+    }
+
+    /// Immutable data iterator.
+    #[inline]
+    pub fn iter(&self) -> slice::Iter<'_, T> {
+        self.as_ref().iter()
+    }
+
+    /// Mutable data iterator.
+    #[inline]
+    pub fn iter_mut(&mut self) -> slice::IterMut<'_, T> {
+        self.as_mut_slice().iter_mut()
+    }
+
+    /// Sort elements with given comparator.
+    #[inline]
+    pub fn sort_by<F>(&mut self, compare: F)
+    where
+        F: Fn(&T, &T) -> Ordering,
+    {
+        self.as_mut_slice().sort_by(compare)
+    }
+
+    /// Get data as raw pointer.
+    #[inline]
+    pub fn as_ptr(&self) -> *const T {
+        self.vec.as_ptr()
+    }
+
+    /// Get data a mutable raw pointer.
+    #[inline]
+    pub fn as_mut_ptr(&mut self) -> *mut T {
+        self.vec.as_mut_ptr()
+    }
+}
+
+impl<T: Default + Clear> RepeatedField<T> {
+    /// Push default value.
+    /// This operation could be faster than `rf.push(Default::default())`,
+    /// because it may reuse previously allocated and cleared element.
+    pub fn push_default(&mut self) -> &mut T {
+        if self.len == self.vec.len() {
+            self.vec.push(Default::default());
+        } else {
+            self.vec[self.len].clear();
+        }
+        self.len += 1;
+        self.last_mut().unwrap()
+    }
+}
+
+impl<T> From<Vec<T>> for RepeatedField<T> {
+    #[inline]
+    fn from(values: Vec<T>) -> RepeatedField<T> {
+        RepeatedField::from_vec(values)
+    }
+}
+
+impl<'a, T: Clone> From<&'a [T]> for RepeatedField<T> {
+    #[inline]
+    fn from(values: &'a [T]) -> RepeatedField<T> {
+        RepeatedField::from_slice(values)
+    }
+}
+
+impl<T> From<RepeatedField<T>> for Vec<T> {
+    #[inline]
+    fn from(val: RepeatedField<T>) -> Self {
+        val.into_vec()
+    }
+}
+
+impl<T: Clone> RepeatedField<T> {
+    /// Copy slice data to `RepeatedField`
+    #[inline]
+    pub fn from_slice(values: &[T]) -> RepeatedField<T> {
+        RepeatedField::from_vec(values.to_vec())
+    }
+
+    /// Copy slice data to `RepeatedField`
+    #[inline]
+    pub fn from_ref<X: AsRef<[T]>>(values: X) -> RepeatedField<T> {
+        RepeatedField::from_slice(values.as_ref())
+    }
+
+    /// Copy this data into new vec.
+    #[inline]
+    pub fn to_vec(&self) -> Vec<T> {
+        self.as_ref().to_vec()
+    }
+}
+
+impl<T: Clone> Clone for RepeatedField<T> {
+    #[inline]
+    fn clone(&self) -> RepeatedField<T> {
+        RepeatedField {
+            vec: self.to_vec(),
+            len: self.len(),
+        }
+    }
+}
+
+impl<T> FromIterator<T> for RepeatedField<T> {
+    #[inline]
+    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> RepeatedField<T> {
+        RepeatedField::from_vec(FromIterator::from_iter(iter))
+    }
+}
+
+impl<'a, T> IntoIterator for &'a RepeatedField<T> {
+    type IntoIter = slice::Iter<'a, T>;
+    type Item = &'a T;
+
+    fn into_iter(self) -> slice::Iter<'a, T> {
+        self.iter()
+    }
+}
+
+impl<'a, T> IntoIterator for &'a mut RepeatedField<T> {
+    type IntoIter = slice::IterMut<'a, T>;
+    type Item = &'a mut T;
+
+    fn into_iter(self) -> slice::IterMut<'a, T> {
+        self.iter_mut()
+    }
+}
+
+impl<T> IntoIterator for RepeatedField<T> {
+    type IntoIter = vec::IntoIter<T>;
+    type Item = T;
+
+    fn into_iter(self) -> vec::IntoIter<T> {
+        RepeatedField::into_iter(self)
+    }
+}
+
+impl<T: PartialEq> PartialEq for RepeatedField<T> {
+    #[inline]
+    fn eq(&self, other: &RepeatedField<T>) -> bool {
+        self.as_ref() == other.as_ref()
+    }
+}
+
+impl<T: Eq> Eq for RepeatedField<T> {}
+
+impl<T: PartialEq> PartialEq<[T]> for RepeatedField<T> {
+    fn eq(&self, other: &[T]) -> bool {
+        self.as_slice() == other
+    }
+}
+
+impl<T: PartialEq> PartialEq<RepeatedField<T>> for [T] {
+    fn eq(&self, other: &RepeatedField<T>) -> bool {
+        self == other.as_slice()
+    }
+}
+
+impl<T: PartialEq> RepeatedField<T> {
+    /// True iff this container contains given element.
+    #[inline]
+    pub fn contains(&self, value: &T) -> bool {
+        self.as_ref().contains(value)
+    }
+}
+
+impl<T: Hash> Hash for RepeatedField<T> {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        self.as_ref().hash(state);
+    }
+}
+
+impl<T> AsRef<[T]> for RepeatedField<T> {
+    #[inline]
+    fn as_ref(&self) -> &[T] {
+        &self.vec[..self.len]
+    }
+}
+
+impl<T> Borrow<[T]> for RepeatedField<T> {
+    #[inline]
+    fn borrow(&self) -> &[T] {
+        &self.vec[..self.len]
+    }
+}
+
+impl<T> Deref for RepeatedField<T> {
+    type Target = [T];
+
+    #[inline]
+    fn deref(&self) -> &[T] {
+        &self.vec[..self.len]
+    }
+}
+
+impl<T> DerefMut for RepeatedField<T> {
+    #[inline]
+    fn deref_mut(&mut self) -> &mut [T] {
+        &mut self.vec[..self.len]
+    }
+}
+
+impl<T> Index<usize> for RepeatedField<T> {
+    type Output = T;
+
+    #[inline]
+    fn index(&self, index: usize) -> &T {
+        &self.as_ref()[index]
+    }
+}
+
+impl<T> IndexMut<usize> for RepeatedField<T> {
+    #[inline]
+    fn index_mut(&mut self, index: usize) -> &mut T {
+        &mut self.as_mut_slice()[index]
+    }
+}
+
+impl<T> Extend<T> for RepeatedField<T> {
+    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
+        self.vec.truncate(self.len);
+        self.vec.extend(iter);
+        self.len = self.vec.len();
+    }
+}
+
+impl<'a, T: Copy + 'a> Extend<&'a T> for RepeatedField<T> {
+    fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
+        self.vec.truncate(self.len);
+        self.vec.extend(iter);
+        self.len = self.vec.len();
+    }
+}
+
+impl<T: fmt::Debug> fmt::Debug for RepeatedField<T> {
+    #[inline]
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        self.as_ref().fmt(f)
+    }
+}
diff --git a/src/remote_write/tests/equivalence_test.rs 
b/src/remote_write/tests/equivalence_test.rs
new file mode 100644
index 00000000..e545ecd4
--- /dev/null
+++ b/src/remote_write/tests/equivalence_test.rs
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This test is used to verify the correctness of the pooled parser rigorously
+//! (50 iterations, 25 iterations for each input data), by comparing the 
results
+//! with the prost parser.
+//!
+//! Test with `--features unsafe-split` to enable the unsafe optimization.
+
+use std::{fs, sync::Arc};
+
+use bytes::Bytes;
+use pb_types::{Exemplar, Label, MetricMetadata, Sample, TimeSeries, 
WriteRequest};
+use prost::Message;
+use remote_write::pooled_parser::PooledParser;
+use tokio::task::JoinHandle;
+
+const ITERATIONS: usize = 50;
+
+fn load_test_data() -> (Bytes, Bytes) {
+    let data1 = Bytes::from(
+        fs::read("tests/workloads/1709380533560664458.data").expect("test data 
load failed"),
+    );
+
+    let data2 = Bytes::from(
+        fs::read("tests/workloads/1709380533705807779.data").expect("test data 
load failed"),
+    );
+
+    (data1, data2)
+}
+
+fn parse_with_prost(data: &Bytes) -> WriteRequest {
+    WriteRequest::decode(data.clone()).expect("prost decode failed")
+}
+
+async fn parse_with_pooled(data: &Bytes) -> WriteRequest {
+    let data_copy = data.clone();
+    let parser = PooledParser;
+    let pooled_request = parser
+        .decode_async(data_copy)
+        .await
+        .expect("pooled decode failed");
+
+    // Convert pooled types to pb_types to compare with prost.
+    let mut write_request = WriteRequest {
+        timeseries: Vec::new(),
+        metadata: Vec::new(),
+    };
+
+    for pooled_ts in &pooled_request.timeseries {
+        let mut timeseries = TimeSeries {
+            labels: Vec::new(),
+            samples: Vec::new(),
+            exemplars: Vec::new(),
+        };
+
+        for pooled_label in &pooled_ts.labels {
+            timeseries.labels.push(Label {
+                name: String::from_utf8_lossy(&pooled_label.name).to_string(),
+                value: 
String::from_utf8_lossy(&pooled_label.value).to_string(),
+            });
+        }
+
+        for pooled_sample in &pooled_ts.samples {
+            timeseries.samples.push(Sample {
+                value: pooled_sample.value,
+                timestamp: pooled_sample.timestamp,
+            });
+        }
+
+        for pooled_exemplar in &pooled_ts.exemplars {
+            let mut exemplar = Exemplar {
+                labels: Vec::new(),
+                value: pooled_exemplar.value,
+                timestamp: pooled_exemplar.timestamp,
+            };
+
+            for pooled_label in &pooled_exemplar.labels {
+                exemplar.labels.push(Label {
+                    name: 
String::from_utf8_lossy(&pooled_label.name).to_string(),
+                    value: 
String::from_utf8_lossy(&pooled_label.value).to_string(),
+                });
+            }
+
+            timeseries.exemplars.push(exemplar);
+        }
+
+        write_request.timeseries.push(timeseries);
+    }
+
+    for pooled_metadata in pooled_request.metadata.iter() {
+        let metadata = MetricMetadata {
+            r#type: pooled_metadata.metric_type as i32,
+            metric_family_name: 
String::from_utf8_lossy(&pooled_metadata.metric_family_name)
+                .to_string(),
+            help: String::from_utf8_lossy(&pooled_metadata.help).to_string(),
+            unit: String::from_utf8_lossy(&pooled_metadata.unit).to_string(),
+        };
+
+        write_request.metadata.push(metadata);
+    }
+
+    // pooled_request will be dropped here and returned to the pool.
+    write_request
+}
+
+#[tokio::test]
+async fn test_sequential_correctness() {
+    let (data1, data2) = load_test_data();
+    let datasets = [&data1, &data2];
+
+    for iteration in 0..ITERATIONS {
+        let data_index = iteration % 2;
+        let data = datasets[data_index];
+
+        let prost_result = parse_with_prost(data);
+        let pooled_result = parse_with_pooled(data).await;
+
+        assert_eq!(
+            &prost_result, &pooled_result,
+            "Data {} WriteRequest mismatch",
+            data_index
+        );
+    }
+}
+
+#[tokio::test]
+async fn test_concurrent_correctness() {
+    let (data1, data2) = load_test_data();
+    let data1 = Arc::new(data1);
+    let data2 = Arc::new(data2);
+
+    let mut handles: Vec<JoinHandle<()>> = Vec::new();
+
+    for iteration in 0..ITERATIONS {
+        let data1_clone = Arc::clone(&data1);
+        let data2_clone = Arc::clone(&data2);
+
+        let handle = tokio::spawn(async move {
+            let data_index = iteration % 2;
+            let data = if data_index == 0 {
+                &*data1_clone
+            } else {
+                &*data2_clone
+            };
+
+            let prost_result = parse_with_prost(data);
+            let pooled_result = parse_with_pooled(data).await;
+
+            assert_eq!(
+                &prost_result, &pooled_result,
+                "Data {} WriteRequest mismatch",
+                data_index
+            );
+        });
+
+        handles.push(handle);
+    }
+
+    for handle in handles {
+        handle.await.expect("task completion failed");
+    }
+}
diff --git a/src/remote_write/tests/workloads/1709380533560664458.data 
b/src/remote_write/tests/workloads/1709380533560664458.data
new file mode 100644
index 00000000..fb3cdad5
Binary files /dev/null and 
b/src/remote_write/tests/workloads/1709380533560664458.data differ
diff --git a/src/remote_write/tests/workloads/1709380533705807779.data 
b/src/remote_write/tests/workloads/1709380533705807779.data
new file mode 100644
index 00000000..47035421
Binary files /dev/null and 
b/src/remote_write/tests/workloads/1709380533705807779.data differ


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to