Copilot commented on code in PR #407:
URL: https://github.com/apache/fluss-rust/pull/407#discussion_r2907744315
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -217,6 +218,89 @@ struct ActiveRequest {
channel: Sender<Result<Response, RpcError>>,
}
+/// Tracks per-request connection metrics and ensures in-flight gauge cleanup
on drop.
+struct RequestMetricsLifecycle {
+ label: Option<&'static str>,
+ start: Instant,
+ completed: bool,
+}
+
+impl RequestMetricsLifecycle {
+ fn begin(api_key: crate::rpc::ApiKey, request_bytes: u64) -> Self {
+ let label = crate::metrics::api_key_label(api_key);
+ if let Some(label) = label {
+ // Match Java semantics: count request attempts before write/send.
+ metrics::counter!(
+ crate::metrics::CLIENT_REQUESTS_TOTAL,
+ crate::metrics::LABEL_API_KEY => label
+ )
+ .increment(1);
+ metrics::counter!(
+ crate::metrics::CLIENT_BYTES_SENT_TOTAL,
+ crate::metrics::LABEL_API_KEY => label
+ )
+ .increment(request_bytes);
+ metrics::gauge!(
+ crate::metrics::CLIENT_REQUESTS_IN_FLIGHT,
+ crate::metrics::LABEL_API_KEY => label
+ )
+ .increment(1.0);
+ }
+ Self {
+ label,
+ start: Instant::now(),
+ completed: false,
+ }
Review Comment:
`RequestMetricsLifecycle::begin` always captures `Instant::now()` and
allocates a lifecycle object even when the API key is non-reportable (`label ==
None`). Since the start timestamp is only used for latency/histogram on labeled
series, consider skipping `Instant::now()` (e.g., store `start:
Option<Instant>` or return `Option<RequestMetricsLifecycle>` from `begin`) to
avoid adding overhead to admin/metadata/auth requests that intentionally do not
emit metrics.
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -561,3 +667,435 @@ impl Drop for CleanupRequestStateOnCancel {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::error::Error;
+ use crate::rpc::ApiKey;
+ use crate::rpc::api_version::ApiVersion;
+ use crate::rpc::frame::{ReadError, WriteError};
+ use crate::rpc::message::{ReadVersionedType, RequestBody,
WriteVersionedType};
+ use metrics::{SharedString, Unit};
+ use metrics_util::CompositeKey;
+ use metrics_util::debugging::{DebugValue, DebuggingRecorder};
+ use std::sync::OnceLock;
+ use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream};
+ use tokio::sync::Mutex as AsyncMutex;
+
+ // -- Test-only request/response types --------------------------------
+
+ struct TestProduceRequest;
+ struct TestProduceResponse;
+
+ impl RequestBody for TestProduceRequest {
+ type ResponseBody = TestProduceResponse;
+ const API_KEY: ApiKey = ApiKey::ProduceLog;
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+ }
+
+ impl WriteVersionedType<Vec<u8>> for TestProduceRequest {
+ fn write_versioned(&self, _w: &mut Vec<u8>, _v: ApiVersion) ->
Result<(), WriteError> {
+ Ok(())
+ }
+ }
+
+ impl ReadVersionedType<Cursor<Vec<u8>>> for TestProduceResponse {
+ fn read_versioned(_r: &mut Cursor<Vec<u8>>, _v: ApiVersion) ->
Result<Self, ReadError> {
+ Ok(TestProduceResponse)
+ }
+ }
+
+ struct TestMetadataRequest;
+ struct TestMetadataResponse;
+
+ impl RequestBody for TestMetadataRequest {
+ type ResponseBody = TestMetadataResponse;
+ const API_KEY: ApiKey = ApiKey::MetaData;
+ const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+ }
+
+ impl WriteVersionedType<Vec<u8>> for TestMetadataRequest {
+ fn write_versioned(&self, _w: &mut Vec<u8>, _v: ApiVersion) ->
Result<(), WriteError> {
+ Ok(())
+ }
+ }
+
+ impl ReadVersionedType<Cursor<Vec<u8>>> for TestMetadataResponse {
+ fn read_versioned(_r: &mut Cursor<Vec<u8>>, _v: ApiVersion) ->
Result<Self, ReadError> {
+ Ok(TestMetadataResponse)
+ }
+ }
+
+ // -- Mock server -----------------------------------------------------
+
+ /// Reads framed requests and echoes back minimal success responses.
+ async fn mock_echo_server(mut stream: tokio::io::DuplexStream) {
+ loop {
+ let mut len_buf = [0u8; 4];
+ if stream.read_exact(&mut len_buf).await.is_err() {
+ return;
+ }
+ let len = i32::from_be_bytes(len_buf) as usize;
+
+ let mut payload = vec![0u8; len];
+ if stream.read_exact(&mut payload).await.is_err() {
+ return;
+ }
+
+ // Header layout: api_key(2) + api_version(2) + request_id(4)
+ let request_id = i32::from_be_bytes([payload[4], payload[5],
payload[6], payload[7]]);
+
+ // Response: resp_type(1, 0=success) + request_id(4)
+ let mut resp = Vec::with_capacity(5);
+ resp.push(0u8);
+ resp.extend_from_slice(&request_id.to_be_bytes());
+
+ let resp_len = (resp.len() as i32).to_be_bytes();
+ if stream.write_all(&resp_len).await.is_err()
+ || stream.write_all(&resp).await.is_err()
+ || stream.flush().await.is_err()
+ {
+ return;
+ }
+ }
+ }
+
+ /// Reads framed requests and echoes back error responses (resp_type=1).
+ async fn mock_error_server(mut stream: tokio::io::DuplexStream) {
+ use prost::Message;
+
+ loop {
+ let mut len_buf = [0u8; 4];
+ if stream.read_exact(&mut len_buf).await.is_err() {
+ return;
+ }
+ let len = i32::from_be_bytes(len_buf) as usize;
+
+ let mut payload = vec![0u8; len];
+ if stream.read_exact(&mut payload).await.is_err() {
+ return;
+ }
+
+ let request_id = i32::from_be_bytes([payload[4], payload[5],
payload[6], payload[7]]);
+
+ let err = crate::proto::ErrorResponse {
+ error_code: 1,
+ error_message: Some("test error".to_string()),
+ };
+ let mut err_buf = Vec::new();
+ err.encode(&mut err_buf).expect("ErrorResponse encode");
+
+ let mut resp = Vec::with_capacity(5 + err_buf.len());
+ resp.push(1u8); // ERROR_RESPONSE
+ resp.extend_from_slice(&request_id.to_be_bytes());
+ resp.extend(err_buf);
+
+ let resp_len = (resp.len() as i32).to_be_bytes();
+ if stream.write_all(&resp_len).await.is_err()
+ || stream.write_all(&resp).await.is_err()
+ || stream.flush().await.is_err()
+ {
+ return;
+ }
+ }
+ }
+
+ // -- Recorder setup --------------------------------------------------
+
+ /// Shared test recorder (installed once per test binary).
+ static TEST_SNAPSHOTTER: OnceLock<metrics_util::debugging::Snapshotter> =
OnceLock::new();
+ static TEST_LOCK: OnceLock<AsyncMutex<()>> = OnceLock::new();
+
+ fn test_snapshotter() -> &'static metrics_util::debugging::Snapshotter {
+ TEST_SNAPSHOTTER.get_or_init(|| {
+ let recorder = DebuggingRecorder::new();
+ let snapshotter = recorder.snapshotter();
+ recorder
+ .install()
+ .expect("debugging recorder install should succeed in this
test binary");
+ snapshotter
+ })
+ }
+
+ fn test_lock() -> &'static AsyncMutex<()> {
+ TEST_LOCK.get_or_init(|| AsyncMutex::new(()))
+ }
+
+ type SnapshotEntry = (CompositeKey, Option<Unit>, Option<SharedString>,
DebugValue);
+
+ fn has_api_label(key: &CompositeKey, label: &str) -> bool {
+ key.key()
+ .labels()
+ .any(|l| l.key() == crate::metrics::LABEL_API_KEY && l.value() ==
label)
+ }
+
+ fn counter_for_label(entries: &[SnapshotEntry], metric_name: &str, label:
&str) -> u64 {
+ entries
+ .iter()
+ .find_map(|(key, _, _, value)| {
+ if key.key().name() != metric_name || !has_api_label(key,
label) {
+ return None;
+ }
+ match value {
+ DebugValue::Counter(v) => Some(*v),
+ _ => None,
+ }
+ })
+ .unwrap_or(0)
+ }
+
+ fn gauge_for_label(entries: &[SnapshotEntry], metric_name: &str, label:
&str) -> f64 {
+ entries
+ .iter()
+ .find_map(|(key, _, _, value)| {
+ if key.key().name() != metric_name || !has_api_label(key,
label) {
+ return None;
+ }
+ match value {
+ DebugValue::Gauge(v) => Some(v.into_inner()),
+ _ => None,
+ }
+ })
+ .unwrap_or(0.0)
+ }
+
+ fn counter_sum(entries: &[SnapshotEntry], metric_name: &str) -> u64 {
+ entries
+ .iter()
+ .filter_map(|(key, _, _, value)| {
+ if key.key().name() != metric_name {
+ return None;
+ }
+ match value {
+ DebugValue::Counter(v) => Some(*v),
+ _ => None,
+ }
+ })
+ .sum()
+ }
+
+ // -- Tests -----------------------------------------------------------
+
+ #[tokio::test]
+ async fn request_records_metrics_for_reportable_api_key() {
+ let _test_guard = test_lock().lock().await;
+ let snapshotter = test_snapshotter();
+
+ let (client, server) = tokio::io::duplex(4096);
+ tokio::spawn(mock_echo_server(server));
+
+ let conn = ServerConnectionInner::new(BufStream::new(client),
usize::MAX, Arc::from("t"));
+
+ let before: Vec<_> = snapshotter.snapshot().into_vec();
+ let request_before = counter_for_label(
+ &before,
+ crate::metrics::CLIENT_REQUESTS_TOTAL,
+ "produce_log",
+ );
+ let response_before = counter_for_label(
+ &before,
+ crate::metrics::CLIENT_RESPONSES_TOTAL,
+ "produce_log",
+ );
+
+ conn.request(TestProduceRequest).await.unwrap();
+
+ let after: Vec<_> = snapshotter.snapshot().into_vec();
+ let request_after =
+ counter_for_label(&after, crate::metrics::CLIENT_REQUESTS_TOTAL,
"produce_log");
+ let response_after = counter_for_label(
+ &after,
+ crate::metrics::CLIENT_RESPONSES_TOTAL,
+ "produce_log",
+ );
+ assert_eq!(
+ request_after - request_before,
+ 1,
+ "produce_log request counter should increment by 1"
+ );
+ assert_eq!(
+ response_after - response_before,
+ 1,
+ "produce_log completion counter should increment by 1"
+ );
+
+ let has_latency_sample = after.iter().any(|(key, _, _, value)| {
+ key.key().name() == crate::metrics::CLIENT_REQUEST_LATENCY_MS
+ && has_api_label(key, "produce_log")
+ && matches!(value, DebugValue::Histogram(_))
+ });
+ assert!(
+ has_latency_sample,
+ "request latency histogram should be recorded for produce_log"
+ );
Review Comment:
This test only checks that *some* histogram entry exists after the request,
but since the recorder/snapshotter is shared across tests and Rust test order
is not guaranteed, the histogram may already exist from a prior test run,
making this assertion a false positive. Consider comparing before/after
histogram sample counts for `(CLIENT_REQUEST_LATENCY_MS,
api_key="produce_log")` (or resetting/isolating the recorder per test) so the
test proves the request under test actually recorded latency.
##########
crates/fluss/src/metrics.rs:
##########
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metric name constants and helpers for fluss-rust client instrumentation.
+//!
+//! Uses the [`metrics`] crate facade pattern: library code emits metrics via
+//! `counter!`/`gauge!`/`histogram!` macros, and the application installs a
+//! recorder (e.g. `metrics-exporter-prometheus`) to collect them. When no
+//! recorder is installed, all metric calls are no-ops with zero overhead.
+
+use crate::rpc::ApiKey;
+
+// ---------------------------------------------------------------------------
+// Label keys
+// ---------------------------------------------------------------------------
+
+pub const LABEL_API_KEY: &str = "api_key";
+
+// ---------------------------------------------------------------------------
+// Connection / RPC metrics
+//
+// Java reference: ConnectionMetrics.java, ClientMetricGroup.java,
MetricNames.java
+//
+// Byte counting matches Java semantics: both sides count only the API message
+// body, excluding the protocol header and framing.
+// Java: rawRequest.totalSize() / response.totalSize() (see MessageCodec.java).
+// Rust: buf.len() - REQUEST_HEADER_LENGTH for sent bytes,
+// buffer.len() - cursor.position() for received bytes.
+// ---------------------------------------------------------------------------
+
+pub const CLIENT_REQUESTS_TOTAL: &str = "fluss.client.requests.total";
+pub const CLIENT_RESPONSES_TOTAL: &str = "fluss.client.responses.total";
+pub const CLIENT_BYTES_SENT_TOTAL: &str = "fluss.client.bytes_sent.total";
+pub const CLIENT_BYTES_RECEIVED_TOTAL: &str =
"fluss.client.bytes_received.total";
+pub const CLIENT_REQUEST_LATENCY_MS: &str = "fluss.client.request_latency_ms";
+pub const CLIENT_REQUESTS_IN_FLIGHT: &str = "fluss.client.requests_in_flight";
+
Review Comment:
The PR description indicates it will close #390, but #390 explicitly calls
for instrumenting `Sender` and `LogScanner` in addition to providing a metrics
framework. The code in this PR adds connection-level request metrics only;
consider adjusting the PR/issue linkage (don’t close #390 yet, or update the
issue/scope) so tracking reflects what’s actually delivered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]