Copilot commented on code in PR #218:
URL: https://github.com/apache/fluss-rust/pull/218#discussion_r2735067409
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -395,6 +394,26 @@ where
}
}
+impl<F> Drop for CancellationSafeFuture<F>
+where
+ F: Future + Send + 'static,
+{
+ fn drop(&mut self) {
+ // If the future hasn't finished yet, we must ensure it completes in
the background.
+ // This prevents leaving half-sent messages on the wire if the caller
cancels the request.
+ if !self.done {
+ if let Some(fut) = self.inner.take() {
+ // Spawn a background task to finish the write/flush operation.
+ // The ownership of the stream_write lock-guard is held within
the future,
+ // so no other request can start writing until this one
finishes.
+ tokio::spawn(async move {
+ let _ = fut.await;
+ });
Review Comment:
`tokio::spawn(...)` inside `Drop` can panic if this type is dropped when no
Tokio runtime is active (e.g., the connection is dropped after the runtime
shuts down). Consider using `tokio::runtime::Handle::try_current()` and only
spawning when a runtime is available, or otherwise choosing a non-panicking
fallback strategy.
##########
crates/fluss/src/client/admin.rs:
##########
@@ -47,14 +47,15 @@ pub struct FlussAdmin {
impl FlussAdmin {
pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) ->
Result<Self> {
- let admin_con = connections
- .get_connection(
- metadata
- .get_cluster()
- .get_coordinator_server()
- .expect("Couldn't coordinator server"),
- )
- .await?;
+ let admin_con =
+ connections
+
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
+ || Error::UnexpectedError {
+ message: "Couldn't coordinator server".to_string(),
Review Comment:
The error message “Couldn't coordinator server” is unclear/grammatically
incorrect. Consider changing it to something like “Coordinator server not found
in cluster metadata” so callers can distinguish this from RPC connection
failures.
```suggestion
message: "Coordinator server not found in cluster
metadata".to_string(),
```
##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -188,7 +188,11 @@ impl Cluster {
let table_id = table_metadata.table_id;
let table_path = from_pb_table_path(&table_metadata.table_path);
let table_descriptor = TableDescriptor::deserialize_json(
-
&serde_json::from_slice(table_metadata.table_json.as_slice()).unwrap(),
+
&serde_json::from_slice(table_metadata.table_json.as_slice()).map_err(|e| {
+ Error::JsonSerdeError {
+ message: format!("Error when parsing token from
server: {e}"),
Review Comment:
The `JsonSerdeError` message says “parsing token from server”, but this code
is parsing `table_json` into a `TableDescriptor`. Consider updating the wording
and including identifying context (e.g., `table_path`/`table_id`) to make
debugging malformed server responses easier.
```suggestion
message: format!(
"Error deserializing table_json into
TableDescriptor for table_id {} and table_path {}: {}",
table_id, table_path, e
),
```
##########
crates/fluss/tests/integration/admin.rs:
##########
@@ -218,7 +221,10 @@ mod admin_test {
assert_eq!(admin.table_exists(&table_path).await.unwrap(), false);
// drop database
- admin.drop_database(test_db_name, false, true).await;
+ admin
+ .drop_database(test_db_name, false, true)
+ .await
+ .expect("Should drop database");
Review Comment:
This file still has another `drop_database(...).await;` later (in
`test_partition_apis`) that ignores the returned `Result<()>`. Since `Result`
is `#[must_use]`, this can trigger warnings/fail CI under `-D warnings`. Please
handle it consistently (e.g., `expect(...)` or `let _ = ...`).
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -290,7 +286,10 @@ where
self.send_message(buf).await?;
_cleanup_on_cancel.message_sent();
- let mut response = rx.await.expect("Who closed this channel?!")?;
+ let mut response = rx.await.map_err(|e| Error::UnexpectedError {
+ message: "Got recvError, some one close the channel".to_string(),
+ source: None,
Review Comment:
`rx.await.map_err(|e| ...)` discards the underlying `RecvError` (and sets
`source: None`), and the new message is ungrammatical. It also introduces an
unused `e` binding which will warn under `-D warnings`. Consider including `e`
as the error source (or renaming to `_e`) and using a clearer message like
“response channel closed unexpectedly”.
```suggestion
message: "response channel closed unexpectedly".to_string(),
source: Some(Box::new(e)),
```
##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -96,9 +96,9 @@ impl RecordAccumulator {
}
let table_path = &record.table_path;
- let table_info = cluster.get_table(table_path);
+ let table_info = cluster.get_table(table_path)?;
let arrow_compression_info =
table_info.get_table_config().get_arrow_compression_info()?;
- let row_type = &cluster.get_table(table_path).row_type;
+ let row_type = &cluster.get_table(table_path)?.row_type;
Review Comment:
`append_new_batch` now looks up the same table twice
(`cluster.get_table(table_path)?` for `table_info` and again for `row_type`).
This is redundant and complicates error paths; reuse `table_info` to access
`row_type` instead.
```suggestion
let row_type = &table_info.row_type;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]