Copilot commented on code in PR #399:
URL: https://github.com/apache/fluss-rust/pull/399#discussion_r2868380702
##########
bindings/python/src/config.rs:
##########
@@ -113,6 +113,11 @@ impl Config {
FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
})?;
}
+ "request-timeout-ms" => {
+ config.request_timeout_ms =
value.parse::<u64>().map_err(|e| {
+ FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
+ })?;
+ }
Review Comment:
The Python property key for this new setting is introduced as
`request-timeout-ms`, but the existing connect timeout uses `connect-timeout`
(no `-ms`). To keep the public Python config API consistent and reduce user
confusion, consider supporting an alias (e.g., accept both `request-timeout`
and `request-timeout-ms`) and/or aligning naming with the established
connection-timeout key style.
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -388,10 +404,28 @@ where
self.send_message(buf).await?;
_cleanup_on_cancel.message_sent();
- let mut response = rx.await.map_err(|e| Error::UnexpectedError {
- message: "Got recvError, some one close the channel".to_string(),
- source: Some(Box::new(e)),
- })??;
+ let mut response = match self.request_timeout {
+ Some(timeout) => match tokio::time::timeout(timeout, rx).await {
+ Ok(result) => result.map_err(|e| Error::UnexpectedError {
+ message: "Got recvError, some one close the
channel".to_string(),
+ source: Some(Box::new(e)),
+ })??,
+ Err(_elapsed) => {
+ if let ConnectionState::RequestMap(map) =
self.state.lock().deref_mut() {
+ map.remove(&request_id);
+ }
+ return Err(RpcError::RequestTimeout {
+ timeout,
+ api_key: R::API_KEY,
+ }
+ .into());
Review Comment:
When a request times out, the entry is removed from the request map. If a
late response arrives afterward, the read loop will log a WARN about an
"unknown request". With timeouts enabled this can become noisy in production;
consider suppressing/downgrading that log for timed-out request_ids (e.g.,
track timed-out IDs briefly) or otherwise avoid emitting WARNs for expected
late responses.
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -561,3 +595,50 @@ impl Drop for CleanupRequestStateOnCancel {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::TablePath;
+ use crate::rpc::message::TableExistsRequest;
+
+ #[tokio::test]
+ async fn test_request_timeout() {
+ // Create a duplex stream where the "server" side never responds.
+ let (client_stream, _server_stream) = tokio::io::duplex(1024);
+
+ let conn = ServerConnectionInner::new(
+ BufStream::new(client_stream),
+ usize::MAX,
+ Arc::from("test"),
+ Some(Duration::from_millis(50)),
+ );
+
+ let table_path = TablePath::new("db", "table");
+ let request = TableExistsRequest::new(&table_path);
+ let result = conn.request(request).await;
+
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ let err_msg = err.to_string();
+ assert!(
+ err_msg.contains("timed out"),
+ "expected timeout error, got: {err_msg}"
Review Comment:
This test asserts on the formatted error string ("timed out"), which is
brittle if the Display text changes. Prefer asserting on the concrete error
variant (e.g., matching `Error::RpcError { source: RpcError::RequestTimeout {
.. }, .. }`) so the test validates behavior rather than wording.
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -561,3 +595,50 @@ impl Drop for CleanupRequestStateOnCancel {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::TablePath;
+ use crate::rpc::message::TableExistsRequest;
+
+ #[tokio::test]
+ async fn test_request_timeout() {
+ // Create a duplex stream where the "server" side never responds.
+ let (client_stream, _server_stream) = tokio::io::duplex(1024);
+
+ let conn = ServerConnectionInner::new(
+ BufStream::new(client_stream),
+ usize::MAX,
+ Arc::from("test"),
+ Some(Duration::from_millis(50)),
+ );
+
+ let table_path = TablePath::new("db", "table");
+ let request = TableExistsRequest::new(&table_path);
+ let result = conn.request(request).await;
+
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ let err_msg = err.to_string();
+ assert!(
+ err_msg.contains("timed out"),
+ "expected timeout error, got: {err_msg}"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_request_no_timeout() {
+ // With None timeout, verify a connection can still be constructed
without panics.
+ let (client_stream, _server_stream) = tokio::io::duplex(1024);
+
+ let conn = ServerConnectionInner::new(
+ BufStream::new(client_stream),
+ usize::MAX,
+ Arc::from("test"),
+ None,
+ );
+
+ assert!(!conn.is_poisoned());
+ }
Review Comment:
`test_request_no_timeout` doesn't currently verify the "no timeout"
behavior—`assert!(!conn.is_poisoned())` will be true immediately after
construction in normal cases. Consider either removing this test or changing it
to assert that a request remains pending for longer than some short duration
(without hanging the test).
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -388,10 +404,28 @@ where
self.send_message(buf).await?;
_cleanup_on_cancel.message_sent();
- let mut response = rx.await.map_err(|e| Error::UnexpectedError {
- message: "Got recvError, some one close the channel".to_string(),
- source: Some(Box::new(e)),
- })??;
+ let mut response = match self.request_timeout {
+ Some(timeout) => match tokio::time::timeout(timeout, rx).await {
+ Ok(result) => result.map_err(|e| Error::UnexpectedError {
+ message: "Got recvError, some one close the
channel".to_string(),
+ source: Some(Box::new(e)),
+ })??,
Review Comment:
The RecvError mapping message is ungrammatical and not very actionable ("Got
recvError, some one close the channel"). Consider rewording to something
clearer like "response channel closed" and including context (e.g., request_id
/ api key), and update both branches (timeout/non-timeout) consistently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]