Copilot commented on code in PR #399:
URL: https://github.com/apache/fluss-rust/pull/399#discussion_r2868380702


##########
bindings/python/src/config.rs:
##########
@@ -113,6 +113,11 @@ impl Config {
                             FlussError::new_err(format!("Invalid value 
'{value}' for '{key}': {e}"))
                         })?;
                     }
+                    "request-timeout-ms" => {
+                        config.request_timeout_ms = 
value.parse::<u64>().map_err(|e| {
+                            FlussError::new_err(format!("Invalid value 
'{value}' for '{key}': {e}"))
+                        })?;
+                    }

Review Comment:
   The Python property key for this new setting is introduced as 
`request-timeout-ms`, but the existing connect timeout uses `connect-timeout` 
(no `-ms`). To keep the public Python config API consistent and reduce user 
confusion, consider supporting an alias (e.g., accept both `request-timeout` 
and `request-timeout-ms`) and/or aligning naming with the established 
connection-timeout key style.



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -388,10 +404,28 @@ where
 
         self.send_message(buf).await?;
         _cleanup_on_cancel.message_sent();
-        let mut response = rx.await.map_err(|e| Error::UnexpectedError {
-            message: "Got recvError, some one close the channel".to_string(),
-            source: Some(Box::new(e)),
-        })??;
+        let mut response = match self.request_timeout {
+            Some(timeout) => match tokio::time::timeout(timeout, rx).await {
+                Ok(result) => result.map_err(|e| Error::UnexpectedError {
+                    message: "Got recvError, some one close the 
channel".to_string(),
+                    source: Some(Box::new(e)),
+                })??,
+                Err(_elapsed) => {
+                    if let ConnectionState::RequestMap(map) = 
self.state.lock().deref_mut() {
+                        map.remove(&request_id);
+                    }
+                    return Err(RpcError::RequestTimeout {
+                        timeout,
+                        api_key: R::API_KEY,
+                    }
+                    .into());

Review Comment:
   When a request times out, the entry is removed from the request map. If a 
late response arrives afterward, the read loop will log a WARN about an 
"unknown request". With timeouts enabled this can become noisy in production; 
consider suppressing/downgrading that log for timed-out request_ids (e.g., 
track timed-out IDs briefly) or otherwise avoid emitting WARNs for expected 
late responses.



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -561,3 +595,50 @@ impl Drop for CleanupRequestStateOnCancel {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::TablePath;
+    use crate::rpc::message::TableExistsRequest;
+
+    #[tokio::test]
+    async fn test_request_timeout() {
+        // Create a duplex stream where the "server" side never responds.
+        let (client_stream, _server_stream) = tokio::io::duplex(1024);
+
+        let conn = ServerConnectionInner::new(
+            BufStream::new(client_stream),
+            usize::MAX,
+            Arc::from("test"),
+            Some(Duration::from_millis(50)),
+        );
+
+        let table_path = TablePath::new("db", "table");
+        let request = TableExistsRequest::new(&table_path);
+        let result = conn.request(request).await;
+
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        let err_msg = err.to_string();
+        assert!(
+            err_msg.contains("timed out"),
+            "expected timeout error, got: {err_msg}"

Review Comment:
   This test asserts on the formatted error string ("timed out"), which is 
brittle if the Display text changes. Prefer asserting on the concrete error 
variant (e.g., matching `Error::RpcError { source: RpcError::RequestTimeout { 
.. }, .. }`) so the test validates behavior rather than wording.



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -561,3 +595,50 @@ impl Drop for CleanupRequestStateOnCancel {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::TablePath;
+    use crate::rpc::message::TableExistsRequest;
+
+    #[tokio::test]
+    async fn test_request_timeout() {
+        // Create a duplex stream where the "server" side never responds.
+        let (client_stream, _server_stream) = tokio::io::duplex(1024);
+
+        let conn = ServerConnectionInner::new(
+            BufStream::new(client_stream),
+            usize::MAX,
+            Arc::from("test"),
+            Some(Duration::from_millis(50)),
+        );
+
+        let table_path = TablePath::new("db", "table");
+        let request = TableExistsRequest::new(&table_path);
+        let result = conn.request(request).await;
+
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        let err_msg = err.to_string();
+        assert!(
+            err_msg.contains("timed out"),
+            "expected timeout error, got: {err_msg}"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_request_no_timeout() {
+        // With None timeout, verify a connection can still be constructed 
without panics.
+        let (client_stream, _server_stream) = tokio::io::duplex(1024);
+
+        let conn = ServerConnectionInner::new(
+            BufStream::new(client_stream),
+            usize::MAX,
+            Arc::from("test"),
+            None,
+        );
+
+        assert!(!conn.is_poisoned());
+    }

Review Comment:
   `test_request_no_timeout` doesn't currently verify the "no timeout" 
behavior—`assert!(!conn.is_poisoned())` will be true immediately after 
construction in normal cases. Consider either removing this test or changing it 
to assert that a request remains pending for longer than some short duration 
(without hanging the test).



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -388,10 +404,28 @@ where
 
         self.send_message(buf).await?;
         _cleanup_on_cancel.message_sent();
-        let mut response = rx.await.map_err(|e| Error::UnexpectedError {
-            message: "Got recvError, some one close the channel".to_string(),
-            source: Some(Box::new(e)),
-        })??;
+        let mut response = match self.request_timeout {
+            Some(timeout) => match tokio::time::timeout(timeout, rx).await {
+                Ok(result) => result.map_err(|e| Error::UnexpectedError {
+                    message: "Got recvError, some one close the 
channel".to_string(),
+                    source: Some(Box::new(e)),
+                })??,

Review Comment:
   The RecvError mapping message is ungrammatical and not very actionable ("Got 
recvError, some one close the channel"). Consider rewording to something 
clearer like "response channel closed" and including context (e.g., request_id 
/ api key), and update both branches (timeout/non-timeout) consistently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to