This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 9cec5636 feat: add hand-written Prometheus remote write request parser
(#1628)
9cec5636 is described below
commit 9cec5636a80875a3d35a007d41cf5ee98fcf2dbb
Author: Peiyang He <[email protected]>
AuthorDate: Tue Oct 28 22:49:42 2025 +0800
feat: add hand-written Prometheus remote write request parser (#1628)
## Rationale
A hand-written remote write request parser to replace prost, with the
primary goal of achieving zero-allocation parsing.
## Detailed Changes
- The new `remote_write` directory contains the core implementation of
the hand-written parser.
- Add benchmarks.
- Add a .proto file in `pb_types`.
- Fix CI errors.
## Test Plan
- Unit tests can be found at the end of the `pb_reader.rs`.
- A comprehensive equivalence test is available in
`equivalence_test.rs`, which validates the correctness of the
hand-written parser by comparing its output with that of the prost
auto-generated parser.
---
.github/workflows/ci.yml | 2 +
.gitignore | 1 +
Cargo.lock | 238 ++++++++-
Cargo.toml | 68 +--
LICENSE | 4 +-
.../assets/remote-write-concurrent-performance.png | Bin 0 -> 578747 bytes
docs/assets/remote-write-memory-performance.png | Bin 0 -> 119032 bytes
.../assets/remote-write-sequential-performance.png | Bin 0 -> 620542 bytes
licenserc.toml | 2 +
src/benchmarks/Cargo.toml | 25 +
src/benchmarks/benches/bench.rs | 107 +++-
src/benchmarks/build.rs | 106 ++++
src/benchmarks/config.toml | 5 +
src/benchmarks/remote_write_memory_bench.py | 237 +++++++++
src/benchmarks/src/bin/parser_mem.rs | 138 +++++
src/benchmarks/src/bin/pool_stats.rs | 82 +++
src/benchmarks/src/config.rs | 8 +
src/benchmarks/src/lib.rs | 15 +-
src/benchmarks/src/remote_write_bench.rs | 175 +++++++
src/benchmarks/src/util.rs | 125 +++++
src/pb_types/build.rs | 5 +-
src/pb_types/protos/remote_write.proto | 77 +++
src/pb_types/src/lib.rs | 5 +
src/{benchmarks => remote_write}/Cargo.toml | 23 +-
src/remote_write/README.md | 291 +++++++++++
src/{benchmarks => remote_write}/src/lib.rs | 9 +-
src/remote_write/src/pb_reader.rs | 565 +++++++++++++++++++++
src/remote_write/src/pooled_parser.rs | 73 +++
src/remote_write/src/pooled_types.rs | 192 +++++++
src/remote_write/src/repeated_field.rs | 534 +++++++++++++++++++
src/remote_write/tests/equivalence_test.rs | 177 +++++++
.../tests/workloads/1709380533560664458.data | Bin 0 -> 1690278 bytes
.../tests/workloads/1709380533705807779.data | Bin 0 -> 1687445 bytes
33 files changed, 3214 insertions(+), 75 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ea01d87e..ee55523d 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -61,6 +61,7 @@ jobs:
run: |
sudo apt update
sudo apt install --yes protobuf-compiler
+ cargo install pb-rs
- name: Install check binaries
run: |
cargo install --git https://github.com/DevinR528/cargo-sort --rev
55ec890 --locked
@@ -82,6 +83,7 @@ jobs:
run: |
sudo apt update
sudo apt install --yes protobuf-compiler
+ cargo install pb-rs
- uses: Swatinem/rust-cache@v2
- name: Run Unit Tests
run: |
diff --git a/.gitignore b/.gitignore
index f3b4b08a..21aab843 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,5 +9,6 @@ integration_tests/dist_query/output
.project
.tools
bin
+!src/benchmarks/src/bin
coverage.txt
tini
diff --git a/Cargo.lock b/Cargo.lock
index 42ca6c48..a97698ea 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -546,7 +546,7 @@ dependencies = [
"memchr",
"num",
"regex",
- "regex-syntax",
+ "regex-syntax 0.8.4",
]
[[package]]
@@ -633,9 +633,19 @@ dependencies = [
"columnar_storage",
"common",
"criterion",
+ "deadpool",
+ "num_cpus",
"pb_types",
"prost",
+ "protobuf",
+ "protobuf-codegen",
+ "quick-protobuf",
+ "remote_write",
"serde",
+ "serde_json",
+ "tikv-jemalloc-ctl",
+ "tikv-jemallocator",
+ "tokio",
"toml",
"tracing",
"tracing-subscriber",
@@ -1410,7 +1420,7 @@ dependencies = [
"itertools 0.13.0",
"log",
"paste",
- "regex-syntax",
+ "regex-syntax 0.8.4",
]
[[package]]
@@ -1524,6 +1534,24 @@ dependencies = [
"strum",
]
+[[package]]
+name = "deadpool"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
+dependencies = [
+ "async-trait",
+ "deadpool-runtime",
+ "num_cpus",
+ "tokio",
+]
+
+[[package]]
+name = "deadpool-runtime"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
+
[[package]]
name = "deranged"
version = "0.3.11"
@@ -1842,6 +1870,15 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
+[[package]]
+name = "home"
+version = "0.5.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
+dependencies = [
+ "windows-sys 0.59.0",
+]
+
[[package]]
name = "http"
version = "0.2.12"
@@ -2152,11 +2189,11 @@ dependencies = [
[[package]]
name = "matchers"
-version = "0.2.0"
+version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
+checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
- "regex-automata",
+ "regex-automata 0.1.10",
]
[[package]]
@@ -2226,11 +2263,12 @@ checksum =
"defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
[[package]]
name = "nu-ansi-term"
-version = "0.50.1"
+version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399"
+checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
- "windows-sys 0.52.0",
+ "overload",
+ "winapi",
]
[[package]]
@@ -2383,6 +2421,12 @@ dependencies = [
"num-traits",
]
+[[package]]
+name = "overload"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
+
[[package]]
name = "parking_lot"
version = "0.12.3"
@@ -2672,6 +2716,66 @@ dependencies = [
"prost",
]
+[[package]]
+name = "protobuf"
+version = "3.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
+dependencies = [
+ "once_cell",
+ "protobuf-support",
+ "thiserror",
+]
+
+[[package]]
+name = "protobuf-codegen"
+version = "3.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace"
+dependencies = [
+ "anyhow",
+ "once_cell",
+ "protobuf",
+ "protobuf-parse",
+ "regex",
+ "tempfile",
+ "thiserror",
+]
+
+[[package]]
+name = "protobuf-parse"
+version = "3.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973"
+dependencies = [
+ "anyhow",
+ "indexmap",
+ "log",
+ "protobuf",
+ "protobuf-support",
+ "tempfile",
+ "thiserror",
+ "which",
+]
+
+[[package]]
+name = "protobuf-support"
+version = "3.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
+dependencies = [
+ "thiserror",
+]
+
+[[package]]
+name = "quick-protobuf"
+version = "0.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9d6da84cc204722a989e01ba2f6e1e276e190f22263d0cb6ce8526fcdb0d2e1f"
+dependencies = [
+ "byteorder",
+]
+
[[package]]
name = "quote"
version = "1.0.37"
@@ -2748,8 +2852,17 @@ checksum =
"4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
dependencies = [
"aho-corasick",
"memchr",
- "regex-automata",
- "regex-syntax",
+ "regex-automata 0.4.7",
+ "regex-syntax 0.8.4",
+]
+
+[[package]]
+name = "regex-automata"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
+dependencies = [
+ "regex-syntax 0.6.29",
]
[[package]]
@@ -2760,7 +2873,7 @@ checksum =
"38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df"
dependencies = [
"aho-corasick",
"memchr",
- "regex-syntax",
+ "regex-syntax 0.8.4",
]
[[package]]
@@ -2769,12 +2882,32 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
+[[package]]
+name = "regex-syntax"
+version = "0.6.29"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
+
[[package]]
name = "regex-syntax"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
+[[package]]
+name = "remote_write"
+version = "2.2.0-alpha"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "bytes",
+ "deadpool",
+ "once_cell",
+ "pb_types",
+ "prost",
+ "tokio",
+]
+
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@@ -3178,6 +3311,37 @@ dependencies = [
"ordered-float",
]
+[[package]]
+name = "tikv-jemalloc-ctl"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c"
+dependencies = [
+ "libc",
+ "paste",
+ "tikv-jemalloc-sys",
+]
+
+[[package]]
+name = "tikv-jemalloc-sys"
+version = "0.5.4+5.3.0-patched"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1"
+dependencies = [
+ "cc",
+ "libc",
+]
+
+[[package]]
+name = "tikv-jemallocator"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca"
+dependencies = [
+ "libc",
+ "tikv-jemalloc-sys",
+]
+
[[package]]
name = "time"
version = "0.3.37"
@@ -3323,9 +3487,9 @@ dependencies = [
[[package]]
name = "tracing"
-version = "0.1.41"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"log",
"pin-project-lite",
@@ -3335,9 +3499,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
-version = "0.1.30"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -3346,9 +3510,9 @@ dependencies = [
[[package]]
name = "tracing-core"
-version = "0.1.34"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -3367,14 +3531,14 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
-version = "0.3.20"
+version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5"
+checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
- "regex-automata",
+ "regex",
"sharded-slab",
"smallvec",
"thread_local",
@@ -3552,6 +3716,34 @@ dependencies = [
"wasm-bindgen",
]
+[[package]]
+name = "which"
+version = "4.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
+dependencies = [
+ "either",
+ "home",
+ "once_cell",
+ "rustix",
+]
+
+[[package]]
+name = "winapi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
+dependencies = [
+ "winapi-i686-pc-windows-gnu",
+ "winapi-x86_64-pc-windows-gnu",
+]
+
+[[package]]
+name = "winapi-i686-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
+
[[package]]
name = "winapi-util"
version = "0.1.9"
@@ -3561,6 +3753,12 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "winapi-x86_64-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+
[[package]]
name = "windows-core"
version = "0.52.0"
diff --git a/Cargo.toml b/Cargo.toml
index 35e3a56b..b4bab12b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,60 +15,64 @@
# specific language governing permissions and limitations
# under the License.
-[workspace.package]
-version = "2.2.0-alpha"
-authors = ["Apache HoraeDB(incubating) <[email protected]>"]
-edition = "2021"
-license = "Apache-2.0"
-repository = "https://github.com/apache/horaedb"
-homepage = "https://horaedb.apache.org/"
-description = "A high-performance, distributed, cloud native time-series
database."
-
[workspace]
resolver = "2"
members = [
"src/benchmarks",
- "src/columnar_storage"
-,
+ "src/columnar_storage",
"src/common",
"src/metric_engine",
"src/pb_types",
+ "src/remote_write",
"src/server"
]
+[workspace.package]
+version = "2.2.0-alpha"
+authors = ["Apache HoraeDB(incubating) <[email protected]>"]
+edition = "2021"
+license = "Apache-2.0"
+repository = "https://github.com/apache/horaedb"
+homepage = "https://horaedb.apache.org/"
+description = "A high-performance, distributed, cloud native time-series
database."
+
[workspace.dependencies]
anyhow = { version = "1.0" }
-seahash = { version = "4" }
-metric_engine = { path = "src/metric_engine" }
+arrow = { version = "53", features = ["prettyprint"] }
+arrow-schema = "53"
+async-scoped = { version = "0.9.0", features = ["use-tokio"] }
+async-stream = "0.3"
+async-trait = "0.1"
+byteorder = "1"
+bytes = "1"
+bytesize = "1"
+clap = "4"
columnar_storage = { path = "src/columnar_storage" }
common = { path = "src/common" }
-thiserror = "1"
-bytes = "1"
-byteorder = "1"
+criterion = "0.5"
datafusion = "43"
-parquet = { version = "53" }
+deadpool = "0.10"
+futures = "0.3"
+itertools = "0.3"
+lazy_static = "1"
+metric_engine = { path = "src/metric_engine" }
object_store = { version = "0.11" }
+once_cell = "1"
+parquet = { version = "53" }
pb_types = { path = "src/pb_types" }
prost = { version = "0.13" }
-arrow = { version = "53", features = ["prettyprint"] }
-bytesize = "1"
-clap = "4"
-arrow-schema = "53"
-tokio = { version = "1", features = ["full"] }
-async-trait = "0.1"
-async-stream = "0.3"
-futures = "0.3"
+remote_write = { path = "src/remote_write" }
+seahash = { version = "4" }
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
temp-dir = "0.1"
-itertools = "0.3"
-lazy_static = "1"
+test-log = "0.2"
+thiserror = "1"
+tokio = { version = "1", features = ["full"] }
+toml = "0.8"
tracing = "0.1"
tracing-subscriber = "0.3"
-async-scoped = { version = "0.9.0", features = ["use-tokio"] }
-test-log = "0.2"
uuid = "1"
-criterion = "0.5"
-serde = { version = "1.0", features = ["derive"] }
-toml = "0.8"
# This profile optimizes for good runtime performance.
[profile.release]
diff --git a/LICENSE b/LICENSE
index 866183b7..c31a8935 100644
--- a/LICENSE
+++ b/LICENSE
@@ -237,6 +237,8 @@ The following components are provided under the MIT
License. See project link fo
The text of each license is also included in licenses/LICENSE-[project].txt
* consistent(https://github.com/buraksezer/consistent)
+* prom-write-request-bench(https://github.com/v0y4g3r/prom-write-request-bench)
Files
horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform.go,
-horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go
are modified from consistent.
\ No newline at end of file
+horaemeta/server/coordinator/scheduler/nodepicker/hash/consistent_uniform_test.go
are modified from consistent.
+File src/remote_write/src/repeated_field.rs from prom-write-request-bench.
\ No newline at end of file
diff --git a/docs/assets/remote-write-concurrent-performance.png
b/docs/assets/remote-write-concurrent-performance.png
new file mode 100644
index 00000000..15d185a1
Binary files /dev/null and
b/docs/assets/remote-write-concurrent-performance.png differ
diff --git a/docs/assets/remote-write-memory-performance.png
b/docs/assets/remote-write-memory-performance.png
new file mode 100644
index 00000000..c5859820
Binary files /dev/null and b/docs/assets/remote-write-memory-performance.png
differ
diff --git a/docs/assets/remote-write-sequential-performance.png
b/docs/assets/remote-write-sequential-performance.png
new file mode 100644
index 00000000..a401c285
Binary files /dev/null and
b/docs/assets/remote-write-sequential-performance.png differ
diff --git a/licenserc.toml b/licenserc.toml
index 9bc60b63..54129a8f 100644
--- a/licenserc.toml
+++ b/licenserc.toml
@@ -20,4 +20,6 @@ headerPath = "Apache-2.0-ASF.txt"
excludes = [
# Forked
"src/common/src/size_ext.rs",
+ "src/remote_write/src/repeated_field.rs",
+ "src/pb_types/protos/remote_write.proto",
]
diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml
index a6c017bf..de738d01 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/benchmarks/Cargo.toml
@@ -25,19 +25,44 @@ repository.workspace = true
homepage.workspace = true
description.workspace = true
+[[bin]]
+name = "parser_mem"
+path = "src/bin/parser_mem.rs"
+
+[[bin]]
+name = "pool_stats"
+path = "src/bin/pool_stats.rs"
+
+[features]
+unsafe-split = ["remote_write/unsafe-split"]
+
[dependencies]
bytes = { workspace = true }
columnar_storage = { workspace = true }
common = { workspace = true }
+deadpool = { workspace = true }
pb_types = { workspace = true }
prost = { workspace = true }
+protobuf = "3.7"
+quick-protobuf = "0.8"
+remote_write = { workspace = true }
serde = { workspace = true }
+serde_json = { workspace = true }
+tikv-jemalloc-ctl = "0.5"
+tokio = { workspace = true }
toml = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.5"
+
+[build-dependencies]
+protobuf-codegen = "3.7"
+
[dev-dependencies]
criterion = { workspace = true }
+num_cpus = "1.16"
[[bench]]
name = "bench"
diff --git a/src/benchmarks/benches/bench.rs b/src/benchmarks/benches/bench.rs
index 5ce94444..b932a441 100644
--- a/src/benchmarks/benches/bench.rs
+++ b/src/benchmarks/benches/bench.rs
@@ -22,6 +22,7 @@ use std::{cell::RefCell, sync::Once};
use benchmarks::{
config::{self, BenchConfig},
encoding_bench::EncodingBench,
+ remote_write_bench::RemoteWriteBench,
};
use criterion::*;
@@ -56,10 +57,114 @@ fn bench_manifest_encoding(c: &mut Criterion) {
group.finish();
}
+fn bench_remote_write(c: &mut Criterion) {
+ let config = init_bench();
+
+ let sequential_scales = config.remote_write.sequential_scales.clone();
+ let concurrent_scales = config.remote_write.concurrent_scales.clone();
+ let bench = RefCell::new(RemoteWriteBench::new(config.remote_write));
+
+ let rt = tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(num_cpus::get())
+ .enable_all()
+ .build()
+ .unwrap();
+
+ // Sequential parse bench.
+ let mut group = c.benchmark_group("remote_write_sequential");
+
+ for &n in &sequential_scales {
+ group.bench_with_input(
+ BenchmarkId::new("prost", n),
+ &(&bench, n),
+ |b, (bench, scale)| {
+ let bench = bench.borrow();
+ b.iter(|| bench.prost_parser_sequential(*scale).unwrap())
+ },
+ );
+
+ group.bench_with_input(
+ BenchmarkId::new("pooled", n),
+ &(&bench, &rt, n),
+ |b, (bench, rt, scale)| {
+ let bench = bench.borrow();
+ b.iter(||
rt.block_on(bench.pooled_parser_sequential(*scale)).unwrap())
+ },
+ );
+
+ group.bench_with_input(
+ BenchmarkId::new("quick_protobuf", n),
+ &(&bench, n),
+ |b, (bench, scale)| {
+ let bench = bench.borrow();
+ b.iter(||
bench.quick_protobuf_parser_sequential(*scale).unwrap())
+ },
+ );
+
+ group.bench_with_input(
+ BenchmarkId::new("rust_protobuf", n),
+ &(&bench, n),
+ |b, (bench, scale)| {
+ let bench = bench.borrow();
+ b.iter(||
bench.rust_protobuf_parser_sequential(*scale).unwrap())
+ },
+ );
+ }
+ group.finish();
+
+ // Concurrent parse bench.
+ let mut group = c.benchmark_group("remote_write_concurrent");
+
+ for &scale in &concurrent_scales {
+ group.bench_with_input(
+ BenchmarkId::new("prost", scale),
+ &(&bench, &rt, scale),
+ |b, (bench, rt, scale)| {
+ let bench = bench.borrow();
+ b.iter(||
rt.block_on(bench.prost_parser_concurrent(*scale)).unwrap())
+ },
+ );
+
+ group.bench_with_input(
+ BenchmarkId::new("pooled", scale),
+ &(&bench, &rt, scale),
+ |b, (bench, rt, scale)| {
+ let bench = bench.borrow();
+ b.iter(||
rt.block_on(bench.pooled_parser_concurrent(*scale)).unwrap())
+ },
+ );
+
+ group.bench_with_input(
+ BenchmarkId::new("quick_protobuf", scale),
+ &(&bench, &rt, scale),
+ |b, (bench, rt, scale)| {
+ let bench = bench.borrow();
+ b.iter(|| {
+ rt.block_on(bench.quick_protobuf_parser_concurrent(*scale))
+ .unwrap()
+ })
+ },
+ );
+
+ group.bench_with_input(
+ BenchmarkId::new("rust_protobuf", scale),
+ &(&bench, &rt, scale),
+ |b, (bench, rt, scale)| {
+ let bench = bench.borrow();
+ b.iter(|| {
+ rt.block_on(bench.rust_protobuf_parser_concurrent(*scale))
+ .unwrap()
+ })
+ },
+ );
+ }
+ group.finish();
+}
+
criterion_group!(
name = benches;
config = Criterion::default();
- targets = bench_manifest_encoding,
+ targets = bench_manifest_encoding, bench_remote_write,
);
criterion_main!(benches);
diff --git a/src/benchmarks/build.rs b/src/benchmarks/build.rs
new file mode 100644
index 00000000..a2f95931
--- /dev/null
+++ b/src/benchmarks/build.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{env, fs, path::PathBuf, process::Command};
+
+fn main() {
+ // Similar to prost, we generate rust-protobuf and quick-protobuf code to
+ // OUT_DIR instead of the src directory.
+ let proto_path = "../pb_types/protos/remote_write.proto";
+ let include_path = "../pb_types/protos";
+ let out_dir = env::var("OUT_DIR").unwrap();
+ let out_dir_path = PathBuf::from(&out_dir);
+
+ // Generate rust-protobuf code to OUT_DIR.
+ protobuf_codegen::Codegen::new()
+ .pure()
+ .out_dir(&out_dir)
+ .input(proto_path)
+ .include(include_path)
+ .run()
+ .expect("rust-protobuf code generation failed");
+
+ // Rename rust-protobuf generated file to avoid potential conflicts.
+ let src_file = out_dir_path.join("remote_write.rs");
+ let dst_file = out_dir_path.join("rust_protobuf_remote_write.rs");
+ fs::rename(&src_file, &dst_file).expect("rust-protobuf file rename
failed");
+
+ // Generate quick-protobuf code to OUT_DIR using pb-rs command line tool.
+ let quick_protobuf_file =
out_dir_path.join("quick_protobuf_remote_write.rs");
+ let output = Command::new("pb-rs")
+ .args([
+ "-I",
+ include_path,
+ "-o",
+ quick_protobuf_file.to_str().unwrap(),
+ "-s",
+ proto_path,
+ ])
+ .output()
+ .expect("pb-rs command execution failed");
+
+ if !output.status.success() {
+ panic!(
+ "pb-rs command execution failed: {}",
+ String::from_utf8_lossy(&output.stderr)
+ );
+ }
+
+ // Fix package namespace conflicts and inner attributes using sed.
+ let output = Command::new("sed")
+ .args([
+ "-i",
+ "-e",
+ "s/remote_write:://g",
+ "-e",
+ r"s/#!\[/#[/g",
+ "-e",
+ r"s/^\/\/! /\/\/ /g",
+ "-e",
+
"s/pb_types::mod_MetricMetadata::MetricType/mod_MetricMetadata::MetricType/g",
+ quick_protobuf_file.to_str().unwrap(),
+ ])
+ .output()
+ .expect("sed command execution failed");
+ if !output.status.success() {
+ eprintln!(
+ "warning: sed patching quick-protobuf output failed: {}",
+ String::from_utf8_lossy(&output.stderr)
+ );
+ }
+
+ // Fix inner attributes in rust-protobuf generated file.
+ let output = Command::new("sed")
+ .args([
+ "-i",
+ "-e",
+ r"s/#!\[/#[/g",
+ "-e",
+ r"s/^\/\/! /\/\/ /g",
+ dst_file.to_str().unwrap(),
+ ])
+ .output()
+ .expect("sed command execution failed");
+ if !output.status.success() {
+ eprintln!(
+ "warning: sed patching rust-protobuf output failed: {}",
+ String::from_utf8_lossy(&output.stderr)
+ );
+ }
+
+ println!("cargo:rerun-if-changed={}", proto_path);
+}
diff --git a/src/benchmarks/config.toml b/src/benchmarks/config.toml
index 1859f303..00d27006 100644
--- a/src/benchmarks/config.toml
+++ b/src/benchmarks/config.toml
@@ -20,3 +20,8 @@ record_count = 1000
append_count = 100
bench_measurement_time = "5s"
bench_sample_size = 10
+
+[remote_write]
+workload_file = "../remote_write/tests/workloads/1709380533560664458.data"
+sequential_scales = [1, 5, 10, 20, 50, 100]
+concurrent_scales = [1, 5, 10, 20, 50, 100]
diff --git a/src/benchmarks/remote_write_memory_bench.py
b/src/benchmarks/remote_write_memory_bench.py
new file mode 100644
index 00000000..42a2e502
--- /dev/null
+++ b/src/benchmarks/remote_write_memory_bench.py
@@ -0,0 +1,237 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import subprocess
+import json
+import sys
+import os
+from typing import Dict, Any
+import argparse
+
+try:
+ from tabulate import tabulate
+except ImportError:
+ print("Error: tabulate library not found.")
+ print("Please install it with: pip3 install tabulate")
+ sys.exit(1)
+
+
+class MemoryBenchmark:
+ def __init__(self, scale, mode, use_unsafe=False):
+ self.project_path = "."
+ self.data_path =
"../remote_write/tests/workloads/1709380533560664458.data"
+ self.scale = scale
+ self.mode = mode
+ self.use_unsafe = use_unsafe
+ self.parsers = [
+ "pooled",
+ "prost",
+ "rust-protobuf",
+ "quick-protobuf",
+ ]
+
+ def build_binary(self) -> bool:
+ features_msg = " with unsafe-split" if self.use_unsafe else ""
+ print(f"Building binary{features_msg}...")
+ try:
+ bin_name = "parser_mem"
+ build_cmd = ["cargo", "build", "--release", "--bin", bin_name]
+ if self.use_unsafe:
+ build_cmd.extend(["--features", "unsafe-split"])
+ result = subprocess.run(
+ build_cmd,
+ cwd=self.project_path,
+ check=False,
+ )
+ if result.returncode != 0:
+ print("Failed to build binary")
+ return False
+ return True
+ except Exception as e:
+ print(f"Failed to build binary: {e}")
+ return False
+
+ def run_parser(self, parser: str, mode: str, scale: int, bin_name: str) ->
Dict[str, Any]:
+ binary_path = f"../../target/release/{bin_name}"
+
+ cmd = [binary_path, mode, str(scale), parser]
+
+ try:
+ result = subprocess.run(
+ cmd,
+ cwd=self.project_path,
+ capture_output=True,
+ text=True,
+ timeout=300, # 5 minute timeout
+ )
+
+ if result.returncode != 0:
+ print(f"Error running {parser}: {result.stderr}")
+ return {}
+
+ return json.loads(result.stdout.strip())
+
+ except subprocess.TimeoutExpired:
+ print(f"Timeout running {parser} {mode} {scale}")
+ return {}
+ except json.JSONDecodeError as e:
+ print(f"Failed to parse JSON from {parser}: {e}")
+ print(f"Raw output: {result.stdout}")
+ return {}
+ except FileNotFoundError:
+ print(f"Binary not found: {binary_path}")
+ print(f"Please run: cargo build --release --bins")
+ return {}
+ except Exception as e:
+ print(f"Exception running {parser}: {e}")
+ return {}
+
+ def run_benchmarks(self) -> Dict[str, Dict[str, Any]]:
+ results = {}
+ successful_count = 0
+ total_count = len(self.parsers)
+
+ print(f"\nRunning benchmarks for {total_count} parsers...")
+
+ for i, parser in enumerate(self.parsers, 1):
+ print(f"\n[{i}/{total_count}] Testing {parser}...")
+ print(f"Running {self.mode} mode with scale {self.scale}...")
+ result = self.run_parser(
+ parser, self.mode, self.scale, "parser_mem")
+
+ if result:
+ result["parser"] = parser
+ results[parser] = result
+ successful_count += 1
+ print(f"Success")
+ else:
+ print(f"Failed")
+
+ print(
+ f"\nCompleted: {successful_count}/{total_count} parsers succeeded")
+
+ if successful_count == total_count:
+ print("All parsers succeeded - generating report...")
+ return results
+ else:
+ print("Some parsers failed - skipping report generation")
+ return {}
+
+ def analyze_results(self, results: Dict[str, Dict[str, Any]]):
+ if not results:
+ print("\nNo results to analyze - all parsers failed or were
skipped")
+ return
+
+ print(f"\n{'='*80}")
+ print("MEMORY BENCHMARK RESULTS")
+ print(f"{'='*80}")
+ print(f"Mode: {self.mode.upper()}, Scale: {self.scale}")
+ print()
+
+ headers = [
+ "Parser",
+ "ThreadAlloc",
+ "ThreadDealloc",
+ "Allocated",
+ "Active",
+ "Metadata",
+ "Mapped",
+ "Resident",
+ "Retained",
+ ]
+
+ table_data = []
+ for parser in self.parsers:
+ if parser in results:
+ result = results[parser]
+ memory = result.get("memory", {})
+ row = [
+ parser,
+ f"{memory.get('thread_allocated_diff', 0):,}",
+ f"{memory.get('thread_deallocated_diff', 0):,}",
+ f"{memory.get('allocated', 0):,}",
+ f"{memory.get('active', 0):,}",
+ f"{memory.get('metadata', 0):,}",
+ f"{memory.get('mapped', 0):,}",
+ f"{memory.get('resident', 0):,}",
+ f"{memory.get('retained', 0):,}",
+ ]
+ table_data.append(row)
+
+ print("SUMMARY TABLE (All values in bytes)")
+ print(
+ tabulate(
+ table_data,
+ headers=headers,
+ tablefmt="grid",
+ stralign="right",
+ numalign="right",
+ )
+ )
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Memory benchmark for protobuf parsers"
+ )
+ parser.add_argument(
+ "--unsafe", action="store_true", help="Enable unsafe-split feature"
+ )
+ parser.add_argument(
+ "--mode",
+ choices=["sequential", "concurrent"],
+ default="sequential",
+ help="Test mode to run (default: sequential)",
+ )
+ parser.add_argument(
+ "--scale",
+ type=int,
+ default=10,
+ help="Scale value for benchmark (default: 10)",
+ )
+
+ args = parser.parse_args()
+
+ if args.scale <= 0:
+ print(f"Invalid scale value '{args.scale}', scale must be positive")
+ sys.exit(1)
+
+ data_path = "../remote_write/tests/workloads/1709380533560664458.data"
+ if not os.path.exists(data_path):
+ print(f"Test data file not found at {data_path}")
+ print("Please ensure test data exists before running benchmarks")
+ sys.exit(1)
+
+ benchmark = MemoryBenchmark(
+ scale=args.scale, mode=args.mode, use_unsafe=args.unsafe
+ )
+
+ if not benchmark.build_binary():
+ sys.exit(1)
+
+ print(f"\nRunning memory benchmarks...")
+ print(f"Mode: {args.mode}")
+ print(f"Scale: {args.scale}")
+ print(f"Unsafe optimization: {'enabled' if args.unsafe else 'disabled'}")
+
+ results = benchmark.run_benchmarks()
+ benchmark.analyze_results(results)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/benchmarks/src/bin/parser_mem.rs
b/src/benchmarks/src/bin/parser_mem.rs
new file mode 100644
index 00000000..0028cd57
--- /dev/null
+++ b/src/benchmarks/src/bin/parser_mem.rs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use benchmarks::util::{MemoryBenchConfig, MemoryStats};
+use pb_types::WriteRequest as ProstWriteRequest;
+use prost::Message;
+use protobuf::Message as ProtobufMessage;
+use quick_protobuf::{BytesReader, MessageRead};
+use remote_write::pooled_parser::PooledParser;
+use tikv_jemallocator::Jemalloc;
+
+#[global_allocator]
+static ALLOC: Jemalloc = Jemalloc;
+
+#[tokio::main(flavor = "current_thread")]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ let config = MemoryBenchConfig::from_args();
+ let args: Vec<String> = std::env::args().collect();
+ let parser = args.get(3).map(|s| s.as_str()).unwrap_or("pooled");
+
+ let start_stats = MemoryStats::collect()?;
+
+ match config.mode.as_str() {
+ "sequential" => match parser {
+ "pooled" => {
+ let parser = PooledParser;
+ for _ in 0..config.scale {
+ let _ =
parser.decode_async(config.test_data.clone()).await?;
+ }
+ }
+ "prost" => {
+ for _ in 0..config.scale {
+ ProstWriteRequest::decode(config.test_data.clone())?;
+ }
+ }
+ "rust-protobuf" => {
+ for _ in 0..config.scale {
+ let _ =
benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(
+ &config.test_data,
+ )?;
+ }
+ }
+ "quick-protobuf" => {
+ for _ in 0..config.scale {
+ let mut reader =
BytesReader::from_bytes(&config.test_data);
+ let _ =
benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(
+ &mut reader,
+ &config.test_data,
+ )?;
+ }
+ }
+ other => panic!("unknown parser: {}", other),
+ },
+ "concurrent" => match parser {
+ "pooled" => {
+ let mut handles = Vec::new();
+ for _ in 0..config.scale {
+ let data_clone = config.test_data.clone();
+ let handle = tokio::spawn(async move {
+ let parser = PooledParser;
+ let _ = parser.decode_async(data_clone).await;
+ });
+ handles.push(handle);
+ }
+ for handle in handles {
+ handle.await?;
+ }
+ }
+ "prost" => {
+ let mut handles = Vec::new();
+ for _ in 0..config.scale {
+ let data_clone = config.test_data.clone();
+ let handle = tokio::spawn(async move {
+ let _ = ProstWriteRequest::decode(data_clone);
+ });
+ handles.push(handle);
+ }
+ for handle in handles {
+ handle.await?;
+ }
+ }
+ "rust-protobuf" => {
+ let mut handles = Vec::new();
+ for _ in 0..config.scale {
+ let data_clone = config.test_data.clone();
+ let handle = tokio::spawn(async move {
+ let _ =
+
benchmarks::rust_protobuf_remote_write::WriteRequest::parse_from_bytes(
+ &data_clone,
+ );
+ });
+ handles.push(handle);
+ }
+ for handle in handles {
+ handle.await?;
+ }
+ }
+ "quick-protobuf" => {
+ let mut handles = Vec::new();
+ for _ in 0..config.scale {
+ let data_clone = config.test_data.clone();
+ let handle = tokio::spawn(async move {
+ let mut reader = BytesReader::from_bytes(&data_clone);
+ let _ =
benchmarks::quick_protobuf_remote_write::WriteRequest::from_reader(
+ &mut reader,
+ &data_clone,
+ );
+ });
+ handles.push(handle);
+ }
+ for handle in handles {
+ handle.await?;
+ }
+ }
+ other => panic!("unknown parser: {}", other),
+ },
+ _ => panic!("invalid mode"),
+ }
+
+ let end_stats = MemoryStats::collect()?;
+ let memory_diff = start_stats.diff(&end_stats);
+ config.output_json(&memory_diff);
+ Ok(())
+}
diff --git a/src/benchmarks/src/bin/pool_stats.rs
b/src/benchmarks/src/bin/pool_stats.rs
new file mode 100644
index 00000000..48115075
--- /dev/null
+++ b/src/benchmarks/src/bin/pool_stats.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! evaluate the efficiency of the deadpool-backed object pool.
+
+use std::fs;
+
+use bytes::Bytes;
+use remote_write::{pooled_parser::PooledParser, pooled_types::POOL};
+use tikv_jemallocator::Jemalloc;
+use tokio::task::JoinHandle;
+
+#[global_allocator]
+static ALLOC: Jemalloc = Jemalloc;
+
+async fn run_concurrent_parsing(scale: usize) -> deadpool::Status {
+ let data =
fs::read("../remote_write/tests/workloads/1709380533560664458.data")
+ .expect("test data load failed");
+ let data = Bytes::from(data);
+
+ let handles: Vec<JoinHandle<()>> = (0..scale)
+ .map(|_| {
+ let data = data.clone();
+ tokio::spawn(async move {
+ let parser = PooledParser;
+ let _ = parser
+ .decode_async(data.clone())
+ .await
+ .expect("parse failed");
+ })
+ })
+ .collect();
+
+ for handle in handles {
+ handle.await.expect("task completion failed");
+ }
+
+ POOL.status()
+}
+
+#[tokio::main]
+async fn main() {
+ let scale_values = [1, 2, 5, 10, 20, 50, 100, 200, 500];
+
+ println!(
+ "{:<8} {:<10} {:<10} {:<10} {:<10}",
+ "Scale", "MaxSize", "PoolSize", "Available", "Waiting"
+ );
+ println!("{}", "=".repeat(50));
+
+ for &scale in &scale_values {
+ let status = run_concurrent_parsing(scale).await;
+
+ println!(
+ "{:<8} {:<10} {:<10} {:<10} {:<10}",
+ scale, status.max_size, status.size, status.available,
status.waiting
+ );
+
+ tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+ }
+
+ println!("=== Final Pool Status ===");
+ let final_status = POOL.status();
+ println!("Max Pool Size: {}", final_status.max_size);
+ println!("Current Pool Size: {}", final_status.size);
+ println!("Available Objects: {}", final_status.available);
+ println!("Waiting Requests: {}", final_status.waiting);
+}
diff --git a/src/benchmarks/src/config.rs b/src/benchmarks/src/config.rs
index 27577f25..199ef627 100644
--- a/src/benchmarks/src/config.rs
+++ b/src/benchmarks/src/config.rs
@@ -27,6 +27,7 @@ const BENCH_CONFIG_PATH_KEY: &str = "BENCH_CONFIG_PATH";
#[derive(Debug, Deserialize)]
pub struct BenchConfig {
pub manifest: ManifestConfig,
+ pub remote_write: RemoteWriteConfig,
}
pub fn config_from_env() -> BenchConfig {
@@ -48,3 +49,10 @@ pub struct ManifestConfig {
pub bench_measurement_time: ReadableDuration,
pub bench_sample_size: usize,
}
+
+#[derive(Deserialize, Debug, Clone)]
+pub struct RemoteWriteConfig {
+ pub workload_file: String,
+ pub sequential_scales: Vec<usize>,
+ pub concurrent_scales: Vec<usize>,
+}
diff --git a/src/benchmarks/src/lib.rs b/src/benchmarks/src/lib.rs
index 180d1b8a..20d2564f 100644
--- a/src/benchmarks/src/lib.rs
+++ b/src/benchmarks/src/lib.rs
@@ -19,4 +19,17 @@
pub mod config;
pub mod encoding_bench;
-mod util;
+pub mod remote_write_bench;
+pub mod util;
+
+#[allow(clippy::all)]
+#[allow(warnings)]
+pub mod quick_protobuf_remote_write {
+ include!(concat!(env!("OUT_DIR"), "/quick_protobuf_remote_write.rs"));
+}
+
+#[allow(clippy::all)]
+#[allow(warnings)]
+pub mod rust_protobuf_remote_write {
+ include!(concat!(env!("OUT_DIR"), "/rust_protobuf_remote_write.rs"));
+}
diff --git a/src/benchmarks/src/remote_write_bench.rs
b/src/benchmarks/src/remote_write_bench.rs
new file mode 100644
index 00000000..b5372da5
--- /dev/null
+++ b/src/benchmarks/src/remote_write_bench.rs
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! remote write parser bench.
+
+use std::{fs, path::PathBuf};
+
+use bytes::Bytes;
+use pb_types::WriteRequest as ProstWriteRequest;
+use prost::Message;
+use protobuf::Message as ProtobufMessage;
+use quick_protobuf::{BytesReader, MessageRead};
+use remote_write::pooled_parser::PooledParser;
+use tokio::task::JoinHandle;
+
+use crate::{
+ config::RemoteWriteConfig,
+ quick_protobuf_remote_write::WriteRequest as QuickProtobufWriteRequest,
+ rust_protobuf_remote_write::WriteRequest as RustProtobufWriteRequest,
+};
+
+pub struct RemoteWriteBench {
+ raw_data: Vec<u8>,
+}
+
+impl RemoteWriteBench {
+ pub fn new(config: RemoteWriteConfig) -> Self {
+ let mut workload_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
+ workload_path.push(&config.workload_file);
+
+ let raw_data = fs::read(&workload_path)
+ .unwrap_or_else(|_| panic!("failed to read workload file: {:?}",
workload_path));
+
+ Self { raw_data }
+ }
+
+ // prost parser sequential bench.
+ pub fn prost_parser_sequential(&self, scale: usize) -> Result<(), String> {
+ for _ in 0..scale {
+ let data = Bytes::from(self.raw_data.clone());
+ ProstWriteRequest::decode(data)
+ .map_err(|e| format!("prost sequential parse failed: {}", e))?;
+ }
+ Ok(())
+ }
+
+ // Hand-written pooled parser sequential bench.
+ pub async fn pooled_parser_sequential(&self, scale: usize) -> Result<(),
String> {
+ let parser = PooledParser;
+ for _ in 0..scale {
+ let data = Bytes::from(self.raw_data.clone());
+ let _ = parser
+ .decode_async(data.clone())
+ .await
+ .map_err(|e| format!("pooled sequential parse failed: {:?}",
e))?;
+ }
+ Ok(())
+ }
+
+ // quick-protobuf parser sequential bench.
+ pub fn quick_protobuf_parser_sequential(&self, scale: usize) -> Result<(),
String> {
+ for _ in 0..scale {
+ let mut reader = BytesReader::from_bytes(&self.raw_data);
+ QuickProtobufWriteRequest::from_reader(&mut reader, &self.raw_data)
+ .map_err(|e| format!("quick-protobuf sequential parse failed:
{}", e))?;
+ }
+ Ok(())
+ }
+
+ // rust-protobuf parser sequential bench.
+ pub fn rust_protobuf_parser_sequential(&self, scale: usize) -> Result<(),
String> {
+ for _ in 0..scale {
+ RustProtobufWriteRequest::parse_from_bytes(&self.raw_data)
+ .map_err(|e| format!("rust-protobuf sequential parse failed:
{}", e))?;
+ }
+ Ok(())
+ }
+
+ // prost parser concurrent bench.
+ pub async fn prost_parser_concurrent(&self, scale: usize) -> Result<(),
String> {
+ let join_handles: Vec<JoinHandle<Result<(), String>>> = (0..scale)
+ .map(|_| {
+ let raw_data = self.raw_data.clone();
+ tokio::spawn(async move {
+ let data = Bytes::from(raw_data);
+ ProstWriteRequest::decode(data)
+ .map_err(|e| format!("prost concurrent parse failed:
{}", e))?;
+ Ok(())
+ })
+ })
+ .collect();
+
+ for join_handle in join_handles {
+ join_handle.await.unwrap()?;
+ }
+ Ok(())
+ }
+
+ // Hand-written pooled parser concurrent bench.
+ pub async fn pooled_parser_concurrent(&self, scale: usize) -> Result<(),
String> {
+ let parser = PooledParser;
+ let join_handles: Vec<JoinHandle<Result<(), String>>> = (0..scale)
+ .map(|_| {
+ let parser = parser.clone();
+ let raw_data = self.raw_data.clone();
+ tokio::spawn(async move {
+ let data = Bytes::from(raw_data);
+ let _ = parser
+ .decode_async(data.clone())
+ .await
+ .map_err(|e| format!("pooled concurrent parse failed:
{:?}", e))?;
+ Ok(())
+ })
+ })
+ .collect();
+
+ for join_handle in join_handles {
+ join_handle.await.unwrap()?;
+ }
+ Ok(())
+ }
+
+ // quick-protobuf parser concurrent bench.
+ pub async fn quick_protobuf_parser_concurrent(&self, scale: usize) ->
Result<(), String> {
+ let join_handles: Vec<tokio::task::JoinHandle<Result<(), String>>> =
(0..scale)
+ .map(|_| {
+ let data = self.raw_data.clone();
+ tokio::spawn(async move {
+ let mut reader = BytesReader::from_bytes(&data);
+ QuickProtobufWriteRequest::from_reader(&mut reader, &data)
+ .map_err(|e| format!("quick-protobuf concurrent parse
failed: {}", e))?;
+ Ok(())
+ })
+ })
+ .collect();
+
+ for join_handle in join_handles {
+ join_handle.await.unwrap()?;
+ }
+ Ok(())
+ }
+
+ // rust-protobuf parser concurrent bench.
+ pub async fn rust_protobuf_parser_concurrent(&self, scale: usize) ->
Result<(), String> {
+ let join_handles: Vec<tokio::task::JoinHandle<Result<(), String>>> =
(0..scale)
+ .map(|_| {
+ let data = self.raw_data.clone();
+ tokio::spawn(async move {
+ RustProtobufWriteRequest::parse_from_bytes(&data)
+ .map_err(|e| format!("rust-protobuf concurrent parse
failed: {}", e))?;
+ Ok(())
+ })
+ })
+ .collect();
+
+ for join_handle in join_handles {
+ join_handle.await.unwrap()?;
+ }
+ Ok(())
+ }
+}
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index 623b5b06..4d74f785 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -18,15 +18,20 @@
//! Utilities for benchmarks.
use std::{
+ env,
fmt::{self, Write},
+ fs,
str::FromStr,
time::Duration,
};
+use bytes::Bytes;
use serde::{
de::{self, Visitor},
Deserialize, Deserializer, Serialize, Serializer,
};
+use serde_json::json;
+use tikv_jemalloc_ctl::{epoch, stats, thread};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Default)]
pub struct ReadableDuration(pub Duration);
@@ -168,3 +173,123 @@ impl<'de> Deserialize<'de> for ReadableDuration {
deserializer.deserialize_str(DurVisitor)
}
}
+
+// Memory bench utilities.
+#[derive(Debug, Clone)]
+pub struct MemoryStats {
+ pub thread_allocated: u64,
+ pub thread_deallocated: u64,
+ pub allocated: u64,
+ pub active: u64,
+ pub metadata: u64,
+ pub mapped: u64,
+ pub resident: u64,
+ pub retained: u64,
+}
+
+#[derive(Debug, Clone)]
+pub struct MemoryStatsDiff {
+ pub thread_allocated_diff: i64,
+ pub thread_deallocated_diff: i64,
+ pub allocated: i64,
+ pub active: i64,
+ pub metadata: i64,
+ pub mapped: i64,
+ pub resident: i64,
+ pub retained: i64,
+}
+
+impl MemoryStats {
+ pub fn collect() -> Result<Self, String> {
+ epoch::advance().map_err(|e| format!("failed to advance jemalloc
epoch: {}", e))?;
+
+ Ok(MemoryStats {
+ thread_allocated: thread::allocatedp::read()
+ .map_err(|e| format!("failed to read thread.allocatedp: {}",
e))?
+ .get(),
+ thread_deallocated: thread::deallocatedp::read()
+ .map_err(|e| format!("failed to read thread.deallocatedp: {}",
e))?
+ .get(),
+ allocated: stats::allocated::read()
+ .map_err(|e| format!("failed to read allocated: {}", e))?
+ .try_into()
+ .unwrap(),
+ active: stats::active::read()
+ .map_err(|e| format!("failed to read active: {}", e))?
+ .try_into()
+ .unwrap(),
+ metadata: stats::metadata::read()
+ .map_err(|e| format!("failed to read metadata: {}", e))?
+ .try_into()
+ .unwrap(),
+ mapped: stats::mapped::read()
+ .map_err(|e| format!("failed to read mapped: {}", e))?
+ .try_into()
+ .unwrap(),
+ resident: stats::resident::read()
+ .map_err(|e| format!("failed to read resident: {}", e))?
+ .try_into()
+ .unwrap(),
+ retained: stats::retained::read()
+ .map_err(|e| format!("failed to read retained: {}", e))?
+ .try_into()
+ .unwrap(),
+ })
+ }
+
+ pub fn diff(&self, other: &MemoryStats) -> MemoryStatsDiff {
+ MemoryStatsDiff {
+ thread_allocated_diff: other.thread_allocated as i64 -
self.thread_allocated as i64,
+ thread_deallocated_diff: other.thread_deallocated as i64
+ - self.thread_deallocated as i64,
+ allocated: other.allocated as i64 - self.allocated as i64,
+ active: other.active as i64 - self.active as i64,
+ metadata: other.metadata as i64 - self.metadata as i64,
+ mapped: other.mapped as i64 - self.mapped as i64,
+ resident: other.resident as i64 - self.resident as i64,
+ retained: other.retained as i64 - self.retained as i64,
+ }
+ }
+}
+
+pub struct MemoryBenchConfig {
+ pub test_data: Bytes,
+ pub scale: usize,
+ pub mode: String,
+}
+
+impl MemoryBenchConfig {
+ pub fn from_args() -> Self {
+ let args: Vec<String> = env::args().collect();
+ let mode = args[1].clone();
+ let scale: usize = args[2].parse().expect("invalid scale");
+ let test_data = Bytes::from(
+
fs::read("../remote_write/tests/workloads/1709380533560664458.data")
+ .expect("test data load failed"),
+ );
+
+ MemoryBenchConfig {
+ test_data,
+ scale,
+ mode,
+ }
+ }
+
+ pub fn output_json(&self, memory_diff: &MemoryStatsDiff) {
+ let result = json!({
+ "mode": self.mode,
+ "scale": self.scale,
+ "memory": {
+ "thread_allocated_diff": memory_diff.thread_allocated_diff,
+ "thread_deallocated_diff": memory_diff.thread_deallocated_diff,
+ "allocated": memory_diff.allocated,
+ "active": memory_diff.active,
+ "metadata": memory_diff.metadata,
+ "mapped": memory_diff.mapped,
+ "resident": memory_diff.resident,
+ "retained": memory_diff.retained
+ }
+ });
+ println!("{}", result);
+ }
+}
diff --git a/src/pb_types/build.rs b/src/pb_types/build.rs
index 7eb68464..1dc9a4ff 100644
--- a/src/pb_types/build.rs
+++ b/src/pb_types/build.rs
@@ -18,6 +18,9 @@
use std::io::Result;
fn main() -> Result<()> {
- prost_build::compile_protos(&["protos/sst.proto"], &["protos/"])?;
+ prost_build::compile_protos(
+ &["protos/sst.proto", "protos/remote_write.proto"],
+ &["protos/"],
+ )?;
Ok(())
}
diff --git a/src/pb_types/protos/remote_write.proto
b/src/pb_types/protos/remote_write.proto
new file mode 100644
index 00000000..110ac67b
--- /dev/null
+++ b/src/pb_types/protos/remote_write.proto
@@ -0,0 +1,77 @@
+// Copyright 2016 Prometheus Team
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// This file is is modified from
+// https://github.com/prometheus/prometheus/blob/main/prompb/remote.proto
+// and https://github.com/prometheus/prometheus/blob/main/prompb/types.proto
+
+syntax = "proto3";
+package pb_types.remote_write;
+
+message WriteRequest {
+ repeated TimeSeries timeseries = 1;
+ // Cortex uses this field to determine the source of the write request.
+ // We reserve it to avoid any compatibility issues.
+ reserved 2;
+ repeated MetricMetadata metadata = 3;
+}
+
+message MetricMetadata {
+ enum MetricType {
+ UNKNOWN = 0;
+ COUNTER = 1;
+ GAUGE = 2;
+ HISTOGRAM = 3;
+ GAUGEHISTOGRAM = 4;
+ SUMMARY = 5;
+ INFO = 6;
+ STATESET = 7;
+ }
+
+ // Represents the metric type, these match the set from Prometheus.
+ // Refer to pkg/textparse/interface.go for details.
+ MetricType type = 1;
+ string metric_family_name = 2;
+ string help = 4;
+ string unit = 5;
+}
+
+// TimeSeries represents samples and labels for a single time series.
+message TimeSeries {
+ // For a timeseries to be valid, and for the samples and exemplars
+ // to be ingested by the remote system properly, the labels field is
required.
+ repeated Label labels = 1;
+ repeated Sample samples = 2;
+ repeated Exemplar exemplars = 3;
+}
+
+message Label {
+ string name = 1;
+ string value = 2;
+}
+
+message Sample {
+ double value = 1;
+ // timestamp is in ms format, see pkg/timestamp/timestamp.go for
+ // conversion from time.Time to Prometheus timestamp.
+ int64 timestamp = 2;
+}
+
+message Exemplar {
+ // Optional, can be empty.
+ repeated Label labels = 1;
+ double value = 2;
+ // timestamp is in ms format, see pkg/timestamp/timestamp.go for
+ // conversion from time.Time to Prometheus timestamp.
+ int64 timestamp = 3;
+}
\ No newline at end of file
diff --git a/src/pb_types/src/lib.rs b/src/pb_types/src/lib.rs
index bfa215b0..727a1d8f 100644
--- a/src/pb_types/src/lib.rs
+++ b/src/pb_types/src/lib.rs
@@ -19,4 +19,9 @@ mod pb_types {
include!(concat!(env!("OUT_DIR"), "/pb_types.sst.rs"));
}
+mod prost_remote_write {
+ include!(concat!(env!("OUT_DIR"), "/pb_types.remote_write.rs"));
+}
+
pub use pb_types::*;
+pub use prost_remote_write::*;
diff --git a/src/benchmarks/Cargo.toml b/src/remote_write/Cargo.toml
similarity index 77%
copy from src/benchmarks/Cargo.toml
copy to src/remote_write/Cargo.toml
index a6c017bf..344308c4 100644
--- a/src/benchmarks/Cargo.toml
+++ b/src/remote_write/Cargo.toml
@@ -16,7 +16,7 @@
# under the License.
[package]
-name = "benchmarks"
+name = "remote_write"
version.workspace = true
authors.workspace = true
edition.workspace = true
@@ -25,20 +25,15 @@ repository.workspace = true
homepage.workspace = true
description.workspace = true
+[features]
+unsafe-split = []
+
[dependencies]
+anyhow = { workspace = true }
+async-trait = { workspace = true }
bytes = { workspace = true }
-columnar_storage = { workspace = true }
-common = { workspace = true }
+deadpool = { workspace = true }
+once_cell = { workspace = true }
pb_types = { workspace = true }
prost = { workspace = true }
-serde = { workspace = true }
-toml = { workspace = true }
-tracing = { workspace = true }
-tracing-subscriber = { workspace = true }
-
-[dev-dependencies]
-criterion = { workspace = true }
-
-[[bench]]
-name = "bench"
-harness = false
+tokio = { workspace = true }
diff --git a/src/remote_write/README.md b/src/remote_write/README.md
new file mode 100644
index 00000000..f41a00c5
--- /dev/null
+++ b/src/remote_write/README.md
@@ -0,0 +1,291 @@
+# Remote Write Parser
+
+A hand-written [Prometheus Remote Write Request
(V1)](https://prometheus.io/docs/specs/prw/remote_write_spec/) parser optimized
for zero-allocation parsing. It receives protobuf data as `Bytes` and returns a
parsed `WriteRequest` instance.
+
+## Implementation
+
+Key optimization techniques:
+
+- Object pooling backed by deadpool.
+
+- `RepeatedField` data structures.
+
+- Zero-copy bytes split backed by unsafe.
+
+- Manual loop unrolling and function inline optimization.
+
+## Performance
+
+This section presents comprehensive performance analysis of the hand-written
pooled parser compared to other popular Rust protobuf libraries
([prost](https://github.com/tokio-rs/prost),
[rust-protobuf](https://github.com/stepancheg/rust-protobuf),
[quick-protobuf](https://github.com/tafia/quick-protobuf)) and the
[easyproto](https://github.com/VictoriaMetrics/easyproto) (Go) implementation.
All tests were conducted on Ubuntu 22.04.4 LTS x86_64 with AMD EPYC 7742 (8) @
2.249GHz CPU and 16GB RAM.
+
+### Prerequisites
+
+Install the required dependencies:
+
+```shell
+cargo install pb-rs
+pip3 install tabulate matplotlib
+```
+
+### CPU Time
+
+#### Steps
+
+Navigate to the benchmarks directory:
+
+```shell
+cd src/benchmarks
+```
+
+Run the standard benchmarks:
+
+```shell
+BENCH_CONFIG_PATH=config.toml cargo bench --bench bench remote_write
+```
+
+Or enable unsafe optimization for better performance:
+
+```shell
+BENCH_CONFIG_PATH=config.toml cargo bench --features unsafe-split --bench
bench remote_write
+```
+
+We also benchmarked against the
[easyproto](https://github.com/VictoriaMetrics/easyproto) library for
comparison:
+
+```shell
+git clone https://github.com/VictoriaMetrics/VictoriaMetrics.git
+git checkout d083ff790a203ecda1cbbd527c792ef19c159f91
+cd VictoriaMetrics/lib/prompb
+vim prom_decode_bench_test.go
+```
+
+and add the following code (please change the path of
1709380533560664458.data):
+
+```go
+package prompb
+
+import (
+ "fmt"
+ "os"
+ "sync"
+ "testing"
+)
+
+type Decoder interface {
+ Parse(data []byte) error
+ Reset()
+ Clone() Decoder
+}
+
+type PooledDecoder struct {
+ pool *sync.Pool
+}
+
+func NewPooledDecoder() *PooledDecoder {
+ pool := &sync.Pool{
+ New: func() interface{} {
+ return &WriteRequest{}
+ },
+ }
+ return &PooledDecoder{pool: pool}
+}
+
+func (d *PooledDecoder) Parse(data []byte) error {
+ wr := d.pool.Get().(*WriteRequest)
+ defer d.pool.Put(wr)
+ wr.Reset()
+ return wr.UnmarshalProtobuf(data)
+}
+
+func (d *PooledDecoder) Reset() {
+ // Pool handles reset internally.
+}
+
+func (d *PooledDecoder) Clone() Decoder {
+ return d
+}
+
+type NoPoolDecoder struct {
+ wr *WriteRequest
+}
+
+func NewNoPoolDecoder() *NoPoolDecoder {
+ return &NoPoolDecoder{
+ wr: &WriteRequest{},
+ }
+}
+
+func (d *NoPoolDecoder) Parse(data []byte) error {
+ d.wr.Reset()
+ return d.wr.UnmarshalProtobuf(data)
+}
+
+func (d *NoPoolDecoder) Reset() {
+ d.wr.Reset()
+}
+
+func (d *NoPoolDecoder) Clone() Decoder {
+ return NewNoPoolDecoder()
+}
+
+func getTestDataPath() ([]byte, error) {
+ return os.ReadFile("1709380533560664458.data")
+}
+
+// Sequential benchmark.
+func benchDecoderSequential(decoder Decoder, data []byte, n int) error {
+ for i := 0; i < n; i++ {
+ decoder.Reset()
+ if err := decoder.Parse(data); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Concurrent benchmark.
+func benchDecoderConcurrent(decoder Decoder, data []byte, workers int) error {
+ results := make(chan error, workers)
+
+ // Spawn workers (similar to tokio::spawn in Rust).
+ for w := 0; w < workers; w++ {
+ go func() {
+ clonedDecoder := decoder.Clone()
+ clonedDecoder.Reset()
+ err := clonedDecoder.Parse(data)
+ results <- err
+ }()
+ }
+
+ // Wait for all workers to complete (similar to join_handle.await).
+ for w := 0; w < workers; w++ {
+ if err := <-results; err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func BenchmarkSequentialParse(b *testing.B) {
+ data, err := getTestDataPath()
+ if err != nil {
+ b.Skipf("test data file not found: %v", err)
+ }
+
+ decoders := map[string]Decoder{
+ "pooled": NewPooledDecoder(),
+ "nopool": NewNoPoolDecoder(),
+ }
+
+ iterations := []int{1, 5, 10, 20, 100}
+
+ for decoderName, decoder := range decoders {
+ for _, n := range iterations {
+ b.Run(fmt.Sprintf("%s/%d", decoderName, n), func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if err := benchDecoderSequential(decoder, data, n); err !=
nil {
+ b.Fatalf("failed to parse: %v", err)
+ }
+ }
+ })
+ }
+ }
+}
+
+func BenchmarkConcurrentParse(b *testing.B) {
+ data, err := getTestDataPath()
+ if err != nil {
+ b.Skipf("test data file not found: %v", err)
+ }
+
+ decoders := map[string]Decoder{
+ "pooled": NewPooledDecoder(),
+ "nopool": NewNoPoolDecoder(),
+ }
+
+ workers := []int{1, 5, 10, 20, 100}
+
+ for decoderName, decoder := range decoders {
+ for _, w := range workers {
+ b.Run(fmt.Sprintf("%s/%d", decoderName, w), func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if err := benchDecoderConcurrent(decoder, data, w); err !=
nil {
+ b.Fatalf("failed to parse: %v", err)
+ }
+ }
+ })
+ }
+ }
+}
+```
+
+Execute the Go benchmarks:
+
+```shell
+go test -bench=. -run=^$
+```
+
+#### Results
+
+Test results are as follows:
+
+
+
+In sequential parsing scenarios, the hand-written pooled parsers (with and
without unsafe optimization) achieve the best performance across all scales
compared to other Rust parsers. The unsafe optimization provides nearly 50%
performance improvement.
+
+
+
+**Note**: Due to the nature of concurrent execution, concurrent parsing
benchmark results may vary (sometimes dramatically) across different runs.
However, we can still draw an overall conclusion.
+
+In concurrent parsing scenarios, from an overall perspective, the hand-written
pooled parsers (with and without unsafe optimization) still achieve the best
performance compared to other Rust parsers. The unsafe optimization continues
to provide performance improvements.
+
+### Memory Allocation
+
+Navigate to the benchmarks directory:
+
+```shell
+cd src/benchmarks
+```
+
+Run memory allocation benchmarks:
+
+```shell
+python3 remote_write_memory_bench.py --mode sequential --scale 10
+```
+
+Or enable unsafe optimization:
+
+```shell
+python3 remote_write_memory_bench.py --mode concurrent --scale 10 --unsafe
+```
+
+**Note**: Sequential and concurrent mode results are similar due to the
enforced `#[tokio::main(flavor = "current_thread")]` configuration. This
constraint is necessary because Jemalloc's `thread::allocatedp` and
`thread::deallocatedp` statistics can only track single-threaded allocations
accurately.
+
+We focus on the
[allocatedp](https://docs.rs/tikv-jemalloc-ctl/0.6.0/tikv_jemalloc_ctl/thread/struct.allocatedp.html)
value to verify our zero-allocation parsing efforts, since it represents the
number of bytes that **have ever been** allocated by the thread.
+
+The results are as follows:
+
+
+
+The hand-written pooled parser allocates minimal memory compared to other Rust
parsers. Note that the difference between `ThreadAlloc` and `ThreadDealloc` in
the pooled decoder is expected since we gather statistics right before the
program terminates and objects remain in the pool (not freed) at that time.
+
+### Object Pool Efficiency
+
+Navigate to the benchmarks directory:
+
+```shell
+cd src/benchmarks
+```
+
+Analyze pool utilization:
+
+```shell
+cargo run --bin pool_stats --release
+```
+
+Our testing finds that only 8 objects in the pool are sufficient to handle 500
concurrent parsing operations.
+
+## Acknowledgements
+
+The two test data files in `src/remote_write/tests/workloads` are taken from
[prom-write-request-bench](https://github.com/v0y4g3r/prom-write-request-bench/tree/main/assets).
diff --git a/src/benchmarks/src/lib.rs b/src/remote_write/src/lib.rs
similarity index 90%
copy from src/benchmarks/src/lib.rs
copy to src/remote_write/src/lib.rs
index 180d1b8a..7b5b1573 100644
--- a/src/benchmarks/src/lib.rs
+++ b/src/remote_write/src/lib.rs
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! lib for benchmarks.
-
-pub mod config;
-pub mod encoding_bench;
-mod util;
+mod pb_reader;
+pub mod pooled_parser;
+pub mod pooled_types;
+mod repeated_field;
diff --git a/src/remote_write/src/pb_reader.rs
b/src/remote_write/src/pb_reader.rs
new file mode 100644
index 00000000..9545d27e
--- /dev/null
+++ b/src/remote_write/src/pb_reader.rs
@@ -0,0 +1,565 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use anyhow::{ensure, Result};
+use bytes::{Buf, Bytes};
+
+use crate::pooled_types::{
+ Exemplar, Label, MetricMetadata, MetricType, Sample, TimeSeries,
WriteRequest,
+};
+
+#[repr(u8)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub(crate) enum WireType {
+ Varint = 0,
+ SixtyFourBit = 1,
+ LengthDelimited = 2,
+}
+
+const FIELD_NUM_TIMESERIES: u32 = 1;
+const FIELD_NUM_METADATA: u32 = 3;
+const FIELD_NUM_LABELS: u32 = 1;
+const FIELD_NUM_SAMPLES: u32 = 2;
+const FIELD_NUM_EXEMPLARS: u32 = 3;
+const FIELD_NUM_LABEL_NAME: u32 = 1;
+const FIELD_NUM_LABEL_VALUE: u32 = 2;
+const FIELD_NUM_SAMPLE_VALUE: u32 = 1;
+const FIELD_NUM_SAMPLE_TIMESTAMP: u32 = 2;
+const FIELD_NUM_EXEMPLAR_LABELS: u32 = 1;
+const FIELD_NUM_EXEMPLAR_VALUE: u32 = 2;
+const FIELD_NUM_EXEMPLAR_TIMESTAMP: u32 = 3;
+const FIELD_NUM_METADATA_TYPE: u32 = 1;
+const FIELD_NUM_METADATA_FAMILY_NAME: u32 = 2;
+const FIELD_NUM_METADATA_HELP: u32 = 4;
+const FIELD_NUM_METADATA_UNIT: u32 = 5;
+
+// Taken from
https://github.com/v0y4g3r/prom-write-request-bench/blob/step6/optimize-slice/src/bytes.rs
under Apache License 2.0.
+#[cfg(feature = "unsafe-split")]
+#[inline(always)]
+unsafe fn copy_to_bytes(data: &mut Bytes, len: usize) -> Bytes {
+ if len == data.remaining() {
+ std::mem::replace(data, Bytes::new())
+ } else {
+ let ret = unsafe { split_to_unsafe(data, len) };
+ data.advance(len);
+ ret
+ }
+}
+
+// Taken from
https://github.com/v0y4g3r/prom-write-request-bench/blob/step6/optimize-slice/src/bytes.rs
under Apache License 2.0.
+#[cfg(feature = "unsafe-split")]
+#[inline(always)]
+pub unsafe fn split_to_unsafe(buf: &Bytes, end: usize) -> Bytes {
+ let len = buf.len();
+ assert!(
+ end <= len,
+ "range end out of bounds: {:?} <= {:?}",
+ end,
+ len,
+ );
+
+ if end == 0 {
+ return Bytes::new();
+ }
+
+ let ptr = buf.as_ptr();
+ // `Bytes::drop` does nothing when it's built via `from_static`.
+ use std::slice;
+ Bytes::from_static(unsafe { slice::from_raw_parts(ptr, end) })
+}
+
+pub struct ProtobufReader {
+ data: Bytes,
+}
+
+impl ProtobufReader {
+ pub fn new(data: Bytes) -> Self {
+ Self { data }
+ }
+
+ pub fn remaining(&self) -> usize {
+ self.data.remaining()
+ }
+
+ /// Read a varint from the buffer.
+ ///
+ /// Similar to [quick-protobuf](https://github.com/tafia/quick-protobuf),
unroll the loop in
+ /// [the official Go
implementation](https://cs.opensource.google/go/go/+/refs/tags/go1.24.5:src/encoding/binary/varint.go;l=68)
+ /// for better performance.
+ #[inline(always)]
+ pub fn read_varint(&mut self) -> Result<u64> {
+ ensure!(self.data.has_remaining(), "not enough bytes for varint");
+ // First byte.
+ let b = self.data.get_u8();
+ if b < 0x80 {
+ return Ok(b as u64);
+ }
+ let mut x = (b & 0x7f) as u64;
+ // Second byte.
+ ensure!(self.data.has_remaining(), "not enough bytes for varint");
+ let b = self.data.get_u8();
+ if b < 0x80 {
+ return Ok(x | ((b as u64) << 7));
+ }
+ x |= ((b & 0x7f) as u64) << 7;
+ // Third byte.
+ ensure!(self.data.has_remaining(), "not enough bytes for varint");
+ let b = self.data.get_u8();
+ if b < 0x80 {
+ return Ok(x | ((b as u64) << 14));
+ }
+ x |= ((b & 0x7f) as u64) << 14;
+ // Fourth byte.
+ ensure!(self.data.has_remaining(), "not enough bytes for varint");
+ let b = self.data.get_u8();
+ if b < 0x80 {
+ return Ok(x | ((b as u64) << 21));
+ }
+ x |= ((b & 0x7f) as u64) << 21;
+ // Fifth byte.
+ ensure!(self.data.has_remaining(), "not enough bytes for varint");
+ let b = self.data.get_u8();
+ if b < 0x80 {
+ return Ok(x | ((b as u64) << 28));
+ }
+ x |= ((b & 0x7f) as u64) << 28;
+ // Sixth byte.
+ ensure!(self.data.has_remaining(), "not enough bytes for varint");
+ let b = self.data.get_u8();
+ if b < 0x80 {
+ return Ok(x | ((b as u64) << 35));
+ }
+ x |= ((b & 0x7f) as u64) << 35;
+ // Seventh byte.
+ ensure!(self.data.has_remaining(), "not enough bytes for varint");
+ let b = self.data.get_u8();
+ if b < 0x80 {
+ return Ok(x | ((b as u64) << 42));
+ }
+ x |= ((b & 0x7f) as u64) << 42;
+ // Eighth byte.
+ ensure!(self.data.has_remaining(), "not enough bytes for varint");
+ let b = self.data.get_u8();
+ if b < 0x80 {
+ return Ok(x | ((b as u64) << 49));
+ }
+ x |= ((b & 0x7f) as u64) << 49;
+ // Ninth byte.
+ ensure!(self.data.has_remaining(), "not enough bytes for varint");
+ let b = self.data.get_u8();
+ if b < 0x80 {
+ return Ok(x | ((b as u64) << 56));
+ }
+ x |= ((b & 0x7f) as u64) << 56;
+ // Tenth byte (final byte, must terminate).
+ ensure!(self.data.has_remaining(), "not enough bytes for varint");
+ let b = self.data.get_u8();
+ ensure!(b < 0x80, "varint overflow");
+ ensure!(b <= 1, "varint overflow");
+ Ok(x | ((b as u64) << 63))
+ }
+
+ /// Read a double from the buffer.
+ #[inline(always)]
+ pub fn read_double(&mut self) -> Result<f64> {
+ ensure!(self.data.remaining() >= 8, "not enough bytes for double");
+ // In Protobuf, double is encoded as 64-bit.
+ let bits = self.data.get_u64_le();
+ Ok(f64::from_bits(bits))
+ }
+
+ /// Read a 64-bit integer from the buffer.
+ #[inline(always)]
+ pub fn read_int64(&mut self) -> Result<i64> {
+ // In Protobuf, int64 is encoded as varint.
+ self.read_varint().map(|v| v as i64)
+ }
+
+ /// Read a string from the buffer.
+ pub fn read_string(&mut self) -> Result<Bytes> {
+ let len = self.read_varint()? as usize;
+ ensure!(self.data.remaining() >= len, "not enough bytes for string");
+ // In Protobuf, string is encoded as length-delimited UTF-8 bytes.
+ #[cfg(feature = "unsafe-split")]
+ let bytes = unsafe { copy_to_bytes(&mut self.data, len) };
+ #[cfg(not(feature = "unsafe-split"))]
+ let bytes = self.data.split_to(len);
+ // Leave the responsibility of validating UTF-8 to the caller,
+ // which is the practice of both
[easyproto](https://github.com/VictoriaMetrics/easyproto)
+ // and
[prom-write-request-bench](https://github.com/v0y4g3r/prom-write-request-bench).
+ Ok(bytes)
+ }
+
+ /// Read a tag from the buffer.
+ #[inline(always)]
+ pub fn read_tag(&mut self) -> Result<(u32, WireType)> {
+ // In Protobuf, tag is encoded as varint.
+ // tag = (field_number << 3) | wire_type.
+ let tag = self.read_varint()?;
+ let field_number = tag >> 3;
+ let wt_val = (tag & 0x07) as u8;
+ ensure!(wt_val <= 2, "unsupported wire type: {}", wt_val);
+ let wire_type = match wt_val {
+ 0 => WireType::Varint,
+ 1 => WireType::SixtyFourBit,
+ 2 => WireType::LengthDelimited,
+ _ => unreachable!(),
+ };
+ Ok((field_number as u32, wire_type))
+ }
+
+ /// Read timeseries from the buffer.
+ #[inline(always)]
+ pub fn read_timeseries(&mut self, timeseries: &mut TimeSeries) ->
Result<()> {
+ let len = self.read_varint()? as usize;
+ ensure!(
+ self.data.remaining() >= len,
+ "not enough bytes for timeseries"
+ );
+ let start_remaining = self.data.remaining();
+ let end_remaining = start_remaining - len;
+ while self.data.remaining() > end_remaining {
+ let (field_number, wire_type) = self.read_tag()?;
+ match field_number {
+ FIELD_NUM_LABELS => {
+ validate_wire_type(wire_type, WireType::LengthDelimited,
"labels")?;
+ let label_ref = timeseries.labels.push_default();
+ self.read_label(label_ref)?;
+ }
+ FIELD_NUM_SAMPLES => {
+ validate_wire_type(wire_type, WireType::LengthDelimited,
"samples")?;
+ let sample_ref = timeseries.samples.push_default();
+ self.read_sample(sample_ref)?;
+ }
+ FIELD_NUM_EXEMPLARS => {
+ validate_wire_type(wire_type, WireType::LengthDelimited,
"exemplars")?;
+ let exemplar_ref = timeseries.exemplars.push_default();
+ self.read_exemplar(exemplar_ref)?;
+ }
+ _ => {
+ // Skip unknown fields instead of returning an error
+ self.skip_field(wire_type)?;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Read label from the buffer.
+ #[inline(always)]
+ pub fn read_label(&mut self, label: &mut Label) -> Result<()> {
+ let len = self.read_varint()? as usize;
+ ensure!(self.data.remaining() >= len, "not enough bytes for label");
+ let start_remaining = self.data.remaining();
+ let end_remaining = start_remaining - len;
+ while self.data.remaining() > end_remaining {
+ let (field_number, wire_type) = self.read_tag()?;
+ match field_number {
+ FIELD_NUM_LABEL_NAME => {
+ validate_wire_type(wire_type, WireType::LengthDelimited,
"label name")?;
+ label.name = self.read_string()?;
+ }
+ FIELD_NUM_LABEL_VALUE => {
+ validate_wire_type(wire_type, WireType::LengthDelimited,
"label value")?;
+ label.value = self.read_string()?;
+ }
+ _ => {
+ self.skip_field(wire_type)?;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Read sample from the buffer.
+ #[inline(always)]
+ pub fn read_sample(&mut self, sample: &mut Sample) -> Result<()> {
+ let len = self.read_varint()? as usize;
+ ensure!(self.data.remaining() >= len, "not enough bytes for sample");
+ let start_remaining = self.data.remaining();
+ let end_remaining = start_remaining - len;
+ while self.data.remaining() > end_remaining {
+ let (field_number, wire_type) = self.read_tag()?;
+ match field_number {
+ FIELD_NUM_SAMPLE_VALUE => {
+ validate_wire_type(wire_type, WireType::SixtyFourBit,
"sample value")?;
+ sample.value = self.read_double()?;
+ }
+ FIELD_NUM_SAMPLE_TIMESTAMP => {
+ validate_wire_type(wire_type, WireType::Varint, "sample
timestamp")?;
+ sample.timestamp = self.read_int64()?;
+ }
+ _ => {
+ self.skip_field(wire_type)?;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Read exemplar from the buffer.
+ #[inline(always)]
+ pub fn read_exemplar(&mut self, exemplar: &mut Exemplar) -> Result<()> {
+ let len = self.read_varint()? as usize;
+ ensure!(
+ self.data.remaining() >= len,
+ "not enough bytes for exemplar"
+ );
+ let start_remaining = self.data.remaining();
+ let end_remaining = start_remaining - len;
+ while self.data.remaining() > end_remaining {
+ let (field_number, wire_type) = self.read_tag()?;
+ match field_number {
+ FIELD_NUM_EXEMPLAR_LABELS => {
+ validate_wire_type(wire_type, WireType::LengthDelimited,
"exemplar labels")?;
+ let label_ref = exemplar.labels.push_default();
+ self.read_label(label_ref)?;
+ }
+ FIELD_NUM_EXEMPLAR_VALUE => {
+ validate_wire_type(wire_type, WireType::SixtyFourBit,
"exemplar value")?;
+ exemplar.value = self.read_double()?;
+ }
+ FIELD_NUM_EXEMPLAR_TIMESTAMP => {
+ validate_wire_type(wire_type, WireType::Varint, "exemplar
timestamp")?;
+ exemplar.timestamp = self.read_int64()?;
+ }
+ _ => {
+ self.skip_field(wire_type)?;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Read metric metadata from the buffer.
+ #[inline(always)]
+ pub fn read_metric_metadata(&mut self, metadata: &mut MetricMetadata) ->
Result<()> {
+ let len = self.read_varint()? as usize;
+ ensure!(
+ self.data.remaining() >= len,
+ "not enough bytes for metadata"
+ );
+ let start_remaining = self.data.remaining();
+ let end_remaining = start_remaining - len;
+ while self.data.remaining() > end_remaining {
+ let (field_number, wire_type) = self.read_tag()?;
+ match field_number {
+ FIELD_NUM_METADATA_TYPE => {
+ validate_wire_type(wire_type, WireType::Varint, "metadata
type")?;
+ let type_value = self.read_varint()? as i32;
+ metadata.metric_type = match type_value {
+ 0 => MetricType::Unknown,
+ 1 => MetricType::Counter,
+ 2 => MetricType::Gauge,
+ 3 => MetricType::Histogram,
+ 4 => MetricType::GaugeHistogram,
+ 5 => MetricType::Summary,
+ 6 => MetricType::Info,
+ 7 => MetricType::StateSet,
+ _ => MetricType::Unknown,
+ };
+ }
+ FIELD_NUM_METADATA_FAMILY_NAME => {
+ validate_wire_type(
+ wire_type,
+ WireType::LengthDelimited,
+ "metadata family name",
+ )?;
+ metadata.metric_family_name = self.read_string()?;
+ }
+ FIELD_NUM_METADATA_HELP => {
+ validate_wire_type(wire_type, WireType::LengthDelimited,
"metadata help")?;
+ metadata.help = self.read_string()?;
+ }
+ FIELD_NUM_METADATA_UNIT => {
+ validate_wire_type(wire_type, WireType::LengthDelimited,
"metadata unit")?;
+ metadata.unit = self.read_string()?;
+ }
+ _ => {
+ self.skip_field(wire_type)?;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Skip an unknown field based on its wire type.
+ #[inline(always)]
+ pub fn skip_field(&mut self, wire_type: WireType) -> Result<()> {
+ match wire_type {
+ WireType::Varint => {
+ // For varint, read and discard the value.
+ self.read_varint()?;
+ Ok(())
+ }
+ WireType::SixtyFourBit => {
+ // For 64-bit, skip 8 bytes.
+ ensure!(
+ self.data.remaining() >= 8,
+ "not enough bytes to skip 64-bit field"
+ );
+ self.data.advance(8);
+ Ok(())
+ }
+ WireType::LengthDelimited => {
+ // For length-delimited, read length then skip that many bytes.
+ let len = self.read_varint()? as usize;
+ ensure!(
+ self.data.remaining() >= len,
+ "not enough bytes to skip length-delimited field"
+ );
+ self.data.advance(len);
+ Ok(())
+ }
+ }
+ }
+}
+
+#[inline(always)]
+fn validate_wire_type(actual: WireType, expected: WireType, field_name: &str)
-> Result<()> {
+ ensure!(
+ actual == expected,
+ "expected wire type {:?} for {}, but found wire type {:?}",
+ expected,
+ field_name,
+ actual
+ );
+ Ok(())
+}
+
+/// Fill a [`WriteRequest`] instance with data from the buffer.
+pub fn read_write_request(data: Bytes, request: &mut WriteRequest) ->
Result<()> {
+ let mut reader = ProtobufReader::new(data);
+ while reader.remaining() > 0 {
+ let (field_number, wire_type) = reader.read_tag()?;
+ match field_number {
+ FIELD_NUM_TIMESERIES => {
+ validate_wire_type(wire_type, WireType::LengthDelimited,
"timeseries")?;
+ let timeseries_ref = request.timeseries.push_default();
+ reader.read_timeseries(timeseries_ref)?;
+ }
+ FIELD_NUM_METADATA => {
+ validate_wire_type(wire_type, WireType::LengthDelimited,
"metadata")?;
+ let metadata_ref = request.metadata.push_default();
+ reader.read_metric_metadata(metadata_ref)?;
+ }
+ _ => {
+ // Skip unknown fields instead of returning an error
+ reader.skip_field(wire_type)?;
+ }
+ }
+ }
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_read_varint_single_byte() {
+ let data = &[0x42];
+ let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data));
+ assert_eq!(reader.read_varint().unwrap(), 66);
+ }
+
+ #[test]
+ fn test_read_varint_multi_byte() {
+ let data = &[0x96, 0x01];
+ let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data));
+ assert_eq!(reader.read_varint().unwrap(), 150);
+ }
+
+ #[test]
+ fn test_read_double() {
+ let data = &[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xF0, 0x3F];
+ let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data));
+ assert_eq!(reader.read_double().unwrap(), 1.0);
+ }
+
+ #[test]
+ fn test_read_string() {
+ let data = &[0x05, b'h', b'e', b'l', b'l', b'o'];
+ let mut reader = ProtobufReader::new(Bytes::copy_from_slice(data));
+ assert_eq!(reader.read_string().unwrap(), "hello");
+ }
+
+ #[test]
+ fn test_parse_write_request() {
+ use pb_types::{
+ Exemplar, Label, MetricMetadata, Sample, TimeSeries, WriteRequest
as PbWriteRequest,
+ };
+ use prost::Message;
+
+ let write_request = PbWriteRequest {
+ timeseries: vec![TimeSeries {
+ labels: vec![Label {
+ name: "metric_name".to_string(),
+ value: "test_value".to_string(),
+ }],
+ samples: vec![Sample {
+ value: 42.5,
+ timestamp: 1234567890,
+ }],
+ exemplars: vec![Exemplar {
+ labels: vec![Label {
+ name: "trace_id".to_string(),
+ value: "abc123".to_string(),
+ }],
+ value: 50.0,
+ timestamp: 1234567891,
+ }],
+ }],
+ metadata: vec![MetricMetadata {
+ r#type: 1,
+ metric_family_name: "test_metric".to_string(),
+ help: "Test metric description".to_string(),
+ unit: "bytes".to_string(),
+ }],
+ };
+
+ let encoded = write_request.encode_to_vec();
+ let data = Bytes::from(encoded);
+ let mut pooled_request = crate::pooled_types::WriteRequest::default();
+ read_write_request(data, &mut pooled_request).unwrap();
+
+ assert_eq!(pooled_request.timeseries.len(), 1);
+ let ts = &pooled_request.timeseries[0];
+ assert_eq!(ts.labels.len(), 1);
+ let label = &ts.labels[0];
+ assert_eq!(label.name, "metric_name");
+ assert_eq!(label.value, "test_value");
+ assert_eq!(ts.samples.len(), 1);
+ let sample = &ts.samples[0];
+ assert_eq!(sample.value, 42.5);
+ assert_eq!(sample.timestamp, 1234567890);
+ assert_eq!(ts.exemplars.len(), 1);
+ let exemplar = &ts.exemplars[0];
+ assert_eq!(exemplar.value, 50.0);
+ assert_eq!(exemplar.timestamp, 1234567891);
+ assert_eq!(exemplar.labels.len(), 1);
+ let exemplar_label = &exemplar.labels[0];
+ assert_eq!(exemplar_label.name, "trace_id");
+ assert_eq!(exemplar_label.value, "abc123");
+ assert_eq!(pooled_request.metadata.len(), 1);
+ let metadata = &pooled_request.metadata[0];
+ assert_eq!(metadata.metric_type, MetricType::Counter);
+ assert_eq!(metadata.metric_family_name, "test_metric");
+ assert_eq!(metadata.help, "Test metric description");
+ assert_eq!(metadata.unit, "bytes");
+ }
+}
diff --git a/src/remote_write/src/pooled_parser.rs
b/src/remote_write/src/pooled_parser.rs
new file mode 100644
index 00000000..8b2c62fb
--- /dev/null
+++ b/src/remote_write/src/pooled_parser.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pooled parser for Prometheus remote write requests.
+//!
+//! This crate parses the protobuf `string` type as Rust `Bytes` instances
+//! instead of `String` instances to avoid allocation, and it **does not**
+//! perform UTF-8 validation when parsing. Therefore, it is up to the caller to
+//! decide how to make use of the parsed `Bytes` and whether to apply UTF-8
+//! validation.
+
+use anyhow::Result;
+use bytes::Bytes;
+
+use crate::{
+ pb_reader::read_write_request,
+ pooled_types::{WriteRequest, WriteRequestManager, POOL},
+ repeated_field::Clear,
+};
+
+#[derive(Debug, Clone)]
+pub struct PooledParser;
+
+impl PooledParser {
+ fn new() -> Self {
+ Self
+ }
+
+ /// Decode a [`WriteRequest`] from the buffer and return it.
+ pub fn decode(&self, buf: Bytes) -> Result<WriteRequest> {
+ // Cannot get a WriteRequest instance from the pool in sync functions.
+ let mut request = WriteRequest::default();
+ read_write_request(buf, &mut request)?;
+ Ok(request)
+ }
+
+ /// Decode a [`WriteRequest`] from the buffer and return a pooled object.
+ ///
+ /// This method will reuse a [`WriteRequest`] instance from the object
+ /// pool. After the returned object is dropped, it will be returned to the
+ pub async fn decode_async(
+ &self,
+ buf: Bytes,
+ ) -> Result<deadpool::managed::Object<WriteRequestManager>> {
+ let mut pooled_request = POOL
+ .get()
+ .await
+ .map_err(|e| anyhow::anyhow!("failed to get object from pool:
{e:?}"))?;
+ pooled_request.clear();
+ read_write_request(buf, &mut pooled_request)?;
+ Ok(pooled_request)
+ }
+}
+
+impl Default for PooledParser {
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/src/remote_write/src/pooled_types.rs
b/src/remote_write/src/pooled_types.rs
new file mode 100644
index 00000000..427eb607
--- /dev/null
+++ b/src/remote_write/src/pooled_types.rs
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use deadpool::managed::{Manager, Metrics, Pool, RecycleResult};
+use once_cell::sync::Lazy;
+
+use crate::repeated_field::{Clear, RepeatedField};
+
+#[derive(Debug, Clone)]
+pub struct Label {
+ pub name: Bytes,
+ pub value: Bytes,
+}
+
+impl Default for Label {
+ fn default() -> Self {
+ Self {
+ name: Bytes::new(),
+ value: Bytes::new(),
+ }
+ }
+}
+
+impl Clear for Label {
+ fn clear(&mut self) {
+ self.name.clear();
+ self.value.clear();
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct Sample {
+ pub value: f64,
+ pub timestamp: i64,
+}
+
+impl Default for Sample {
+ fn default() -> Self {
+ Self {
+ value: 0.0,
+ timestamp: 0,
+ }
+ }
+}
+
+impl Clear for Sample {
+ fn clear(&mut self) {
+ self.value = 0.0;
+ self.timestamp = 0;
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct Exemplar {
+ pub labels: RepeatedField<Label>,
+ pub value: f64,
+ pub timestamp: i64,
+}
+
+impl Default for Exemplar {
+ fn default() -> Self {
+ Self {
+ labels: RepeatedField::default(),
+ value: 0.0,
+ timestamp: 0,
+ }
+ }
+}
+
+impl Clear for Exemplar {
+ fn clear(&mut self) {
+ self.labels.clear();
+ self.value = 0.0;
+ self.timestamp = 0;
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Default)]
+pub enum MetricType {
+ #[default]
+ Unknown = 0,
+ Counter = 1,
+ Gauge = 2,
+ Histogram = 3,
+ GaugeHistogram = 4,
+ Summary = 5,
+ Info = 6,
+ StateSet = 7,
+}
+
+#[derive(Debug, Clone)]
+pub struct MetricMetadata {
+ pub metric_type: MetricType,
+ pub metric_family_name: Bytes,
+ pub help: Bytes,
+ pub unit: Bytes,
+}
+
+impl Default for MetricMetadata {
+ fn default() -> Self {
+ Self {
+ metric_type: MetricType::Unknown,
+ metric_family_name: Bytes::new(),
+ help: Bytes::new(),
+ unit: Bytes::new(),
+ }
+ }
+}
+
+impl Clear for MetricMetadata {
+ fn clear(&mut self) {
+ self.metric_type = MetricType::Unknown;
+ self.metric_family_name.clear();
+ self.help.clear();
+ self.unit.clear();
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct TimeSeries {
+ pub labels: RepeatedField<Label>,
+ pub samples: RepeatedField<Sample>,
+ pub exemplars: RepeatedField<Exemplar>,
+}
+
+impl Clear for TimeSeries {
+ fn clear(&mut self) {
+ self.labels.clear();
+ self.samples.clear();
+ self.exemplars.clear();
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct WriteRequest {
+ pub timeseries: RepeatedField<TimeSeries>,
+ pub metadata: RepeatedField<MetricMetadata>,
+}
+
+impl Clear for WriteRequest {
+ fn clear(&mut self) {
+ self.timeseries.clear();
+ self.metadata.clear();
+ }
+}
+
+/// A deadpool manager for PooledWriteRequest.
+pub struct WriteRequestManager;
+
+#[async_trait]
+impl Manager for WriteRequestManager {
+ type Error = ();
+ type Type = WriteRequest;
+
+ async fn create(&self) -> Result<Self::Type, Self::Error> {
+ Ok(WriteRequest::default())
+ }
+
+ async fn recycle(
+ &self,
+ _obj: &mut Self::Type,
+ _metrics: &Metrics,
+ ) -> RecycleResult<Self::Error> {
+ // We will reset the object after acquiring it.
+ Ok(())
+ }
+}
+
+const POOL_SIZE: usize = 64; // Maximum number of objects in the pool.
+
+pub static POOL: Lazy<Pool<WriteRequestManager>> = Lazy::new(|| {
+ Pool::builder(WriteRequestManager)
+ .max_size(POOL_SIZE)
+ .build()
+ .unwrap()
+});
diff --git a/src/remote_write/src/repeated_field.rs
b/src/remote_write/src/repeated_field.rs
new file mode 100644
index 00000000..37a4f9d3
--- /dev/null
+++ b/src/remote_write/src/repeated_field.rs
@@ -0,0 +1,534 @@
+// Copyright (c) 2019 Stepan Koltsov
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
USE
+// OR OTHER DEALINGS IN THE SOFTWARE.
+
+/// ! The [Clear] trait and [RepeatedField] are taken from
[rust-protobuf](https://github.com/stepancheg/rust-protobuf/tree/master/protobuf-examples/vs-prost)
+/// to leverage the pooling mechanism to avoid frequent heap
+/// allocation/deallocation when decoding deeply nested structs.
+use std::borrow::Borrow;
+use std::{
+ cmp::Ordering,
+ default::Default,
+ fmt,
+ hash::{Hash, Hasher},
+ iter::{FromIterator, IntoIterator},
+ ops::{Deref, DerefMut, Index, IndexMut},
+ slice, vec,
+};
+
+use bytes::Bytes;
+
+/// Anything that can be cleared.
+///
+/// Clear should reset the value to a **logically** empty state equivalent to a
+/// newly created object. It does not necessarily deallocate or zero underlying
+/// memory, which enables efficient memory reuse.
+pub trait Clear {
+ /// Clear this make, make it equivalent to newly created object.
+ fn clear(&mut self);
+}
+
+impl<T> Clear for Option<T> {
+ fn clear(&mut self) {
+ self.take();
+ }
+}
+
+impl Clear for String {
+ fn clear(&mut self) {
+ String::clear(self);
+ }
+}
+
+impl<T> Clear for Vec<T> {
+ fn clear(&mut self) {
+ Vec::clear(self);
+ }
+}
+
+impl Clear for Bytes {
+ fn clear(&mut self) {
+ Bytes::clear(self);
+ }
+}
+
+/// Wrapper around vector to avoid deallocations on clear.
+pub struct RepeatedField<T> {
+ vec: Vec<T>,
+ len: usize,
+}
+
+impl<T> RepeatedField<T> {
+ /// Return number of elements in this container.
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.len
+ }
+
+ /// Clear.
+ #[inline]
+ pub fn clear(&mut self) {
+ self.len = 0;
+ }
+}
+
+impl<T> Default for RepeatedField<T> {
+ #[inline]
+ fn default() -> RepeatedField<T> {
+ RepeatedField {
+ vec: Vec::new(),
+ len: 0,
+ }
+ }
+}
+
+impl<T> RepeatedField<T> {
+ /// Create new empty container.
+ #[inline]
+ pub fn new() -> RepeatedField<T> {
+ Default::default()
+ }
+
+ /// Create a contained with data from given vec.
+ #[inline]
+ pub fn from_vec(vec: Vec<T>) -> RepeatedField<T> {
+ let len = vec.len();
+ RepeatedField { vec, len }
+ }
+
+ /// Convert data into vec.
+ #[inline]
+ pub fn into_vec(self) -> Vec<T> {
+ let mut vec = self.vec;
+ vec.truncate(self.len);
+ vec
+ }
+
+ /// Return current capacity.
+ #[inline]
+ pub fn capacity(&self) -> usize {
+ self.vec.capacity()
+ }
+
+ /// View data as slice.
+ #[inline]
+ pub fn as_slice(&self) -> &[T] {
+ &self.vec[..self.len]
+ }
+
+ /// View data as mutable slice.
+ #[inline]
+ pub fn as_mut_slice(&mut self) -> &mut [T] {
+ &mut self.vec[..self.len]
+ }
+
+ /// Get subslice of this container.
+ #[inline]
+ pub fn slice(&self, start: usize, end: usize) -> &[T] {
+ &self.as_ref()[start..end]
+ }
+
+ /// Get mutable subslice of this container.
+ #[inline]
+ pub fn slice_mut(&mut self, start: usize, end: usize) -> &mut [T] {
+ &mut self.as_mut_slice()[start..end]
+ }
+
+ /// Get slice from given index.
+ #[inline]
+ pub fn slice_from(&self, start: usize) -> &[T] {
+ &self.as_ref()[start..]
+ }
+
+ /// Get mutable slice from given index.
+ #[inline]
+ pub fn slice_from_mut(&mut self, start: usize) -> &mut [T] {
+ &mut self.as_mut_slice()[start..]
+ }
+
+ /// Get slice to given index.
+ #[inline]
+ pub fn slice_to(&self, end: usize) -> &[T] {
+ &self.as_ref()[..end]
+ }
+
+ /// Get mutable slice to given index.
+ #[inline]
+ pub fn slice_to_mut(&mut self, end: usize) -> &mut [T] {
+ &mut self.as_mut_slice()[..end]
+ }
+
+ /// View this container as two slices split at given index.
+ #[inline]
+ pub fn split_at(&self, mid: usize) -> (&[T], &[T]) {
+ self.as_ref().split_at(mid)
+ }
+
+ /// View this container as two mutable slices split at given index.
+ #[inline]
+ pub fn split_at_mut(&mut self, mid: usize) -> (&mut [T], &mut [T]) {
+ self.as_mut_slice().split_at_mut(mid)
+ }
+
+ /// View all but first elements of this container.
+ #[inline]
+ pub fn tail(&self) -> &[T] {
+ &self.as_ref()[1..]
+ }
+
+ /// Last element of this container.
+ #[inline]
+ pub fn last(&self) -> Option<&T> {
+ self.as_ref().last()
+ }
+
+ /// Mutable last element of this container.
+ #[inline]
+ pub fn last_mut(&mut self) -> Option<&mut T> {
+ self.as_mut_slice().last_mut()
+ }
+
+ /// View all but last elements of this container.
+ #[inline]
+ pub fn init(&self) -> &[T] {
+ let s = self.as_ref();
+ &s[0..s.len() - 1]
+ }
+
+ /// Push an element to the end.
+ #[inline]
+ pub fn push(&mut self, value: T) {
+ if self.len == self.vec.len() {
+ self.vec.push(value);
+ } else {
+ self.vec[self.len] = value;
+ }
+ self.len += 1;
+ }
+
+ /// Pop last element.
+ #[inline]
+ pub fn pop(&mut self) -> Option<T> {
+ if self.len == 0 {
+ None
+ } else {
+ self.vec.truncate(self.len);
+ self.len -= 1;
+ self.vec.pop()
+ }
+ }
+
+ /// Insert an element at specified position.
+ #[inline]
+ pub fn insert(&mut self, index: usize, value: T) {
+ assert!(index <= self.len);
+ self.vec.insert(index, value);
+ self.len += 1;
+ }
+
+ /// Remove an element from specified position.
+ #[inline]
+ pub fn remove(&mut self, index: usize) -> T {
+ assert!(index < self.len);
+ self.len -= 1;
+ self.vec.remove(index)
+ }
+
+ /// Retains only the elements specified by the predicate.
+ ///
+ /// In other words, remove all elements `e` such that `f(&e)` returns
+ /// `false`. This method operates in place, visiting each element
+ /// exactly once in the original order, and preserves the order of the
+ /// retained elements.
+ ///
+ /// # Examples
+ ///
+ /// ```ignore
+ /// let mut vec = RepeatedField::from(vec![1, 2, 3, 4]);
+ /// vec.retain(|&x| x % 2 == 0);
+ /// assert_eq!(vec, RepeatedField::from(vec![2, 4]));
+ /// ```
+ pub fn retain<F>(&mut self, f: F)
+ where
+ F: FnMut(&T) -> bool,
+ {
+ // suboptimal
+ self.vec.truncate(self.len);
+ self.vec.retain(f);
+ self.len = self.vec.len();
+ }
+
+ /// Truncate at specified length.
+ #[inline]
+ pub fn truncate(&mut self, len: usize) {
+ if self.len > len {
+ self.len = len;
+ }
+ }
+
+ /// Reverse in place.
+ #[inline]
+ pub fn reverse(&mut self) {
+ self.as_mut_slice().reverse()
+ }
+
+ /// Into owned iterator.
+ #[inline]
+ pub fn into_iter(mut self) -> vec::IntoIter<T> {
+ self.vec.truncate(self.len);
+ self.vec.into_iter()
+ }
+
+ /// Immutable data iterator.
+ #[inline]
+ pub fn iter(&self) -> slice::Iter<'_, T> {
+ self.as_ref().iter()
+ }
+
+ /// Mutable data iterator.
+ #[inline]
+ pub fn iter_mut(&mut self) -> slice::IterMut<'_, T> {
+ self.as_mut_slice().iter_mut()
+ }
+
+ /// Sort elements with given comparator.
+ #[inline]
+ pub fn sort_by<F>(&mut self, compare: F)
+ where
+ F: Fn(&T, &T) -> Ordering,
+ {
+ self.as_mut_slice().sort_by(compare)
+ }
+
+ /// Get data as raw pointer.
+ #[inline]
+ pub fn as_ptr(&self) -> *const T {
+ self.vec.as_ptr()
+ }
+
+ /// Get data a mutable raw pointer.
+ #[inline]
+ pub fn as_mut_ptr(&mut self) -> *mut T {
+ self.vec.as_mut_ptr()
+ }
+}
+
+impl<T: Default + Clear> RepeatedField<T> {
+ /// Push default value.
+ /// This operation could be faster than `rf.push(Default::default())`,
+ /// because it may reuse previously allocated and cleared element.
+ pub fn push_default(&mut self) -> &mut T {
+ if self.len == self.vec.len() {
+ self.vec.push(Default::default());
+ } else {
+ self.vec[self.len].clear();
+ }
+ self.len += 1;
+ self.last_mut().unwrap()
+ }
+}
+
+impl<T> From<Vec<T>> for RepeatedField<T> {
+ #[inline]
+ fn from(values: Vec<T>) -> RepeatedField<T> {
+ RepeatedField::from_vec(values)
+ }
+}
+
+impl<'a, T: Clone> From<&'a [T]> for RepeatedField<T> {
+ #[inline]
+ fn from(values: &'a [T]) -> RepeatedField<T> {
+ RepeatedField::from_slice(values)
+ }
+}
+
+impl<T> From<RepeatedField<T>> for Vec<T> {
+ #[inline]
+ fn from(val: RepeatedField<T>) -> Self {
+ val.into_vec()
+ }
+}
+
+impl<T: Clone> RepeatedField<T> {
+ /// Copy slice data to `RepeatedField`
+ #[inline]
+ pub fn from_slice(values: &[T]) -> RepeatedField<T> {
+ RepeatedField::from_vec(values.to_vec())
+ }
+
+ /// Copy slice data to `RepeatedField`
+ #[inline]
+ pub fn from_ref<X: AsRef<[T]>>(values: X) -> RepeatedField<T> {
+ RepeatedField::from_slice(values.as_ref())
+ }
+
+ /// Copy this data into new vec.
+ #[inline]
+ pub fn to_vec(&self) -> Vec<T> {
+ self.as_ref().to_vec()
+ }
+}
+
+impl<T: Clone> Clone for RepeatedField<T> {
+ #[inline]
+ fn clone(&self) -> RepeatedField<T> {
+ RepeatedField {
+ vec: self.to_vec(),
+ len: self.len(),
+ }
+ }
+}
+
+impl<T> FromIterator<T> for RepeatedField<T> {
+ #[inline]
+ fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> RepeatedField<T> {
+ RepeatedField::from_vec(FromIterator::from_iter(iter))
+ }
+}
+
+impl<'a, T> IntoIterator for &'a RepeatedField<T> {
+ type IntoIter = slice::Iter<'a, T>;
+ type Item = &'a T;
+
+ fn into_iter(self) -> slice::Iter<'a, T> {
+ self.iter()
+ }
+}
+
+impl<'a, T> IntoIterator for &'a mut RepeatedField<T> {
+ type IntoIter = slice::IterMut<'a, T>;
+ type Item = &'a mut T;
+
+ fn into_iter(self) -> slice::IterMut<'a, T> {
+ self.iter_mut()
+ }
+}
+
+impl<T> IntoIterator for RepeatedField<T> {
+ type IntoIter = vec::IntoIter<T>;
+ type Item = T;
+
+ fn into_iter(self) -> vec::IntoIter<T> {
+ RepeatedField::into_iter(self)
+ }
+}
+
+impl<T: PartialEq> PartialEq for RepeatedField<T> {
+ #[inline]
+ fn eq(&self, other: &RepeatedField<T>) -> bool {
+ self.as_ref() == other.as_ref()
+ }
+}
+
+impl<T: Eq> Eq for RepeatedField<T> {}
+
+impl<T: PartialEq> PartialEq<[T]> for RepeatedField<T> {
+ fn eq(&self, other: &[T]) -> bool {
+ self.as_slice() == other
+ }
+}
+
+impl<T: PartialEq> PartialEq<RepeatedField<T>> for [T] {
+ fn eq(&self, other: &RepeatedField<T>) -> bool {
+ self == other.as_slice()
+ }
+}
+
+impl<T: PartialEq> RepeatedField<T> {
+ /// True iff this container contains given element.
+ #[inline]
+ pub fn contains(&self, value: &T) -> bool {
+ self.as_ref().contains(value)
+ }
+}
+
+impl<T: Hash> Hash for RepeatedField<T> {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ self.as_ref().hash(state);
+ }
+}
+
+impl<T> AsRef<[T]> for RepeatedField<T> {
+ #[inline]
+ fn as_ref(&self) -> &[T] {
+ &self.vec[..self.len]
+ }
+}
+
+impl<T> Borrow<[T]> for RepeatedField<T> {
+ #[inline]
+ fn borrow(&self) -> &[T] {
+ &self.vec[..self.len]
+ }
+}
+
+impl<T> Deref for RepeatedField<T> {
+ type Target = [T];
+
+ #[inline]
+ fn deref(&self) -> &[T] {
+ &self.vec[..self.len]
+ }
+}
+
+impl<T> DerefMut for RepeatedField<T> {
+ #[inline]
+ fn deref_mut(&mut self) -> &mut [T] {
+ &mut self.vec[..self.len]
+ }
+}
+
+impl<T> Index<usize> for RepeatedField<T> {
+ type Output = T;
+
+ #[inline]
+ fn index(&self, index: usize) -> &T {
+ &self.as_ref()[index]
+ }
+}
+
+impl<T> IndexMut<usize> for RepeatedField<T> {
+ #[inline]
+ fn index_mut(&mut self, index: usize) -> &mut T {
+ &mut self.as_mut_slice()[index]
+ }
+}
+
+impl<T> Extend<T> for RepeatedField<T> {
+ fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
+ self.vec.truncate(self.len);
+ self.vec.extend(iter);
+ self.len = self.vec.len();
+ }
+}
+
+impl<'a, T: Copy + 'a> Extend<&'a T> for RepeatedField<T> {
+ fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
+ self.vec.truncate(self.len);
+ self.vec.extend(iter);
+ self.len = self.vec.len();
+ }
+}
+
+impl<T: fmt::Debug> fmt::Debug for RepeatedField<T> {
+ #[inline]
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.as_ref().fmt(f)
+ }
+}
diff --git a/src/remote_write/tests/equivalence_test.rs
b/src/remote_write/tests/equivalence_test.rs
new file mode 100644
index 00000000..e545ecd4
--- /dev/null
+++ b/src/remote_write/tests/equivalence_test.rs
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This test is used to verify the correctness of the pooled parser rigorously
+//! (50 iterations, 25 iterations for each input data), by comparing the
results
+//! with the prost parser.
+//!
+//! Test with `--features unsafe-split` to enable the unsafe optimization.
+
+use std::{fs, sync::Arc};
+
+use bytes::Bytes;
+use pb_types::{Exemplar, Label, MetricMetadata, Sample, TimeSeries,
WriteRequest};
+use prost::Message;
+use remote_write::pooled_parser::PooledParser;
+use tokio::task::JoinHandle;
+
+const ITERATIONS: usize = 50;
+
+fn load_test_data() -> (Bytes, Bytes) {
+ let data1 = Bytes::from(
+ fs::read("tests/workloads/1709380533560664458.data").expect("test data
load failed"),
+ );
+
+ let data2 = Bytes::from(
+ fs::read("tests/workloads/1709380533705807779.data").expect("test data
load failed"),
+ );
+
+ (data1, data2)
+}
+
+fn parse_with_prost(data: &Bytes) -> WriteRequest {
+ WriteRequest::decode(data.clone()).expect("prost decode failed")
+}
+
+async fn parse_with_pooled(data: &Bytes) -> WriteRequest {
+ let data_copy = data.clone();
+ let parser = PooledParser;
+ let pooled_request = parser
+ .decode_async(data_copy)
+ .await
+ .expect("pooled decode failed");
+
+ // Convert pooled types to pb_types to compare with prost.
+ let mut write_request = WriteRequest {
+ timeseries: Vec::new(),
+ metadata: Vec::new(),
+ };
+
+ for pooled_ts in &pooled_request.timeseries {
+ let mut timeseries = TimeSeries {
+ labels: Vec::new(),
+ samples: Vec::new(),
+ exemplars: Vec::new(),
+ };
+
+ for pooled_label in &pooled_ts.labels {
+ timeseries.labels.push(Label {
+ name: String::from_utf8_lossy(&pooled_label.name).to_string(),
+ value:
String::from_utf8_lossy(&pooled_label.value).to_string(),
+ });
+ }
+
+ for pooled_sample in &pooled_ts.samples {
+ timeseries.samples.push(Sample {
+ value: pooled_sample.value,
+ timestamp: pooled_sample.timestamp,
+ });
+ }
+
+ for pooled_exemplar in &pooled_ts.exemplars {
+ let mut exemplar = Exemplar {
+ labels: Vec::new(),
+ value: pooled_exemplar.value,
+ timestamp: pooled_exemplar.timestamp,
+ };
+
+ for pooled_label in &pooled_exemplar.labels {
+ exemplar.labels.push(Label {
+ name:
String::from_utf8_lossy(&pooled_label.name).to_string(),
+ value:
String::from_utf8_lossy(&pooled_label.value).to_string(),
+ });
+ }
+
+ timeseries.exemplars.push(exemplar);
+ }
+
+ write_request.timeseries.push(timeseries);
+ }
+
+ for pooled_metadata in pooled_request.metadata.iter() {
+ let metadata = MetricMetadata {
+ r#type: pooled_metadata.metric_type as i32,
+ metric_family_name:
String::from_utf8_lossy(&pooled_metadata.metric_family_name)
+ .to_string(),
+ help: String::from_utf8_lossy(&pooled_metadata.help).to_string(),
+ unit: String::from_utf8_lossy(&pooled_metadata.unit).to_string(),
+ };
+
+ write_request.metadata.push(metadata);
+ }
+
+ // pooled_request will be dropped here and returned to the pool.
+ write_request
+}
+
+#[tokio::test]
+async fn test_sequential_correctness() {
+ let (data1, data2) = load_test_data();
+ let datasets = [&data1, &data2];
+
+ for iteration in 0..ITERATIONS {
+ let data_index = iteration % 2;
+ let data = datasets[data_index];
+
+ let prost_result = parse_with_prost(data);
+ let pooled_result = parse_with_pooled(data).await;
+
+ assert_eq!(
+ &prost_result, &pooled_result,
+ "Data {} WriteRequest mismatch",
+ data_index
+ );
+ }
+}
+
+#[tokio::test]
+async fn test_concurrent_correctness() {
+ let (data1, data2) = load_test_data();
+ let data1 = Arc::new(data1);
+ let data2 = Arc::new(data2);
+
+ let mut handles: Vec<JoinHandle<()>> = Vec::new();
+
+ for iteration in 0..ITERATIONS {
+ let data1_clone = Arc::clone(&data1);
+ let data2_clone = Arc::clone(&data2);
+
+ let handle = tokio::spawn(async move {
+ let data_index = iteration % 2;
+ let data = if data_index == 0 {
+ &*data1_clone
+ } else {
+ &*data2_clone
+ };
+
+ let prost_result = parse_with_prost(data);
+ let pooled_result = parse_with_pooled(data).await;
+
+ assert_eq!(
+ &prost_result, &pooled_result,
+ "Data {} WriteRequest mismatch",
+ data_index
+ );
+ });
+
+ handles.push(handle);
+ }
+
+ for handle in handles {
+ handle.await.expect("task completion failed");
+ }
+}
diff --git a/src/remote_write/tests/workloads/1709380533560664458.data
b/src/remote_write/tests/workloads/1709380533560664458.data
new file mode 100644
index 00000000..fb3cdad5
Binary files /dev/null and
b/src/remote_write/tests/workloads/1709380533560664458.data differ
diff --git a/src/remote_write/tests/workloads/1709380533705807779.data
b/src/remote_write/tests/workloads/1709380533705807779.data
new file mode 100644
index 00000000..47035421
Binary files /dev/null and
b/src/remote_write/tests/workloads/1709380533705807779.data differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]