Copilot commented on code in PR #218:
URL: https://github.com/apache/fluss-rust/pull/218#discussion_r2735067409


##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -395,6 +394,26 @@ where
     }
 }
 
+impl<F> Drop for CancellationSafeFuture<F>
+where
+    F: Future + Send + 'static,
+{
+    fn drop(&mut self) {
+        // If the future hasn't finished yet, we must ensure it completes in 
the background.
+        // This prevents leaving half-sent messages on the wire if the caller 
cancels the request.
+        if !self.done {
+            if let Some(fut) = self.inner.take() {
+                // Spawn a background task to finish the write/flush operation.
+                // The ownership of the stream_write lock-guard is held within 
the future,
+                // so no other request can start writing until this one 
finishes.
+                tokio::spawn(async move {
+                    let _ = fut.await;
+                });

Review Comment:
   `tokio::spawn(...)` inside `Drop` can panic if this type is dropped when no 
Tokio runtime is active (e.g., the connection is dropped after the runtime 
shuts down). Consider using `tokio::runtime::Handle::try_current()` and only 
spawning when a runtime is available, or otherwise choosing a non-panicking 
fallback strategy.



##########
crates/fluss/src/client/admin.rs:
##########
@@ -47,14 +47,15 @@ pub struct FlussAdmin {
 
 impl FlussAdmin {
     pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> 
Result<Self> {
-        let admin_con = connections
-            .get_connection(
-                metadata
-                    .get_cluster()
-                    .get_coordinator_server()
-                    .expect("Couldn't coordinator server"),
-            )
-            .await?;
+        let admin_con =
+            connections
+                
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
+                    || Error::UnexpectedError {
+                        message: "Couldn't coordinator server".to_string(),

Review Comment:
   The error message “Couldn't coordinator server” is unclear/grammatically 
incorrect. Consider changing it to something like “Coordinator server not found 
in cluster metadata” so callers can distinguish this from RPC connection 
failures.
   ```suggestion
                           message: "Coordinator server not found in cluster 
metadata".to_string(),
   ```



##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -188,7 +188,11 @@ impl Cluster {
             let table_id = table_metadata.table_id;
             let table_path = from_pb_table_path(&table_metadata.table_path);
             let table_descriptor = TableDescriptor::deserialize_json(
-                
&serde_json::from_slice(table_metadata.table_json.as_slice()).unwrap(),
+                
&serde_json::from_slice(table_metadata.table_json.as_slice()).map_err(|e| {
+                    Error::JsonSerdeError {
+                        message: format!("Error when parsing token from 
server: {e}"),

Review Comment:
   The `JsonSerdeError` message says “parsing token from server”, but this code 
is parsing `table_json` into a `TableDescriptor`. Consider updating the wording 
and including identifying context (e.g., `table_path`/`table_id`) to make 
debugging malformed server responses easier.
   ```suggestion
                           message: format!(
                               "Error deserializing table_json into 
TableDescriptor for table_id {} and table_path {}: {}",
                               table_id, table_path, e
                           ),
   ```



##########
crates/fluss/tests/integration/admin.rs:
##########
@@ -218,7 +221,10 @@ mod admin_test {
         assert_eq!(admin.table_exists(&table_path).await.unwrap(), false);
 
         // drop database
-        admin.drop_database(test_db_name, false, true).await;
+        admin
+            .drop_database(test_db_name, false, true)
+            .await
+            .expect("Should drop database");

Review Comment:
   This file still has another `drop_database(...).await;` later (in 
`test_partition_apis`) that ignores the returned `Result<()>`. Since `Result` 
is `#[must_use]`, this can trigger warnings/fail CI under `-D warnings`. Please 
handle it consistently (e.g., `expect(...)` or `let _ = ...`).



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -290,7 +286,10 @@ where
 
         self.send_message(buf).await?;
         _cleanup_on_cancel.message_sent();
-        let mut response = rx.await.expect("Who closed this channel?!")?;
+        let mut response = rx.await.map_err(|e| Error::UnexpectedError {
+            message: "Got recvError, some one close the channel".to_string(),
+            source: None,

Review Comment:
   `rx.await.map_err(|e| ...)` discards the underlying `RecvError` (and sets 
`source: None`), and the new message is ungrammatical. It also introduces an 
unused `e` binding which will warn under `-D warnings`. Consider including `e` 
as the error source (or renaming to `_e`) and using a clearer message like 
“response channel closed unexpectedly”.
   ```suggestion
               message: "response channel closed unexpectedly".to_string(),
               source: Some(Box::new(e)),
   ```



##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -96,9 +96,9 @@ impl RecordAccumulator {
         }
 
         let table_path = &record.table_path;
-        let table_info = cluster.get_table(table_path);
+        let table_info = cluster.get_table(table_path)?;
         let arrow_compression_info = 
table_info.get_table_config().get_arrow_compression_info()?;
-        let row_type = &cluster.get_table(table_path).row_type;
+        let row_type = &cluster.get_table(table_path)?.row_type;

Review Comment:
   `append_new_batch` now looks up the same table twice 
(`cluster.get_table(table_path)?` for `table_info` and again for `row_type`). 
This is redundant and complicates error paths; reuse `table_info` to access 
`row_type` instead.
   ```suggestion
           let row_type = &table_info.row_type;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to