This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b9d1d8  chore: minor fix for rust code (#218)
2b9d1d8 is described below

commit 2b9d1d8c3238a7aaa3d7514d5caebdb661bd6214
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Jan 28 21:52:02 2026 +0800

    chore: minor fix for rust code (#218)
---
 crates/fluss/src/client/admin.rs                   | 24 ++++----
 crates/fluss/src/client/connection.rs              | 22 ++++++--
 crates/fluss/src/client/metadata.rs                |  9 ++-
 crates/fluss/src/client/write/accumulator.rs       | 18 +++---
 crates/fluss/src/client/write/sender.rs            |  2 +-
 crates/fluss/src/cluster/cluster.rs                | 27 ++++++---
 crates/fluss/src/io/file_io.rs                     |  4 +-
 .../fluss/src/record/kv/kv_record_batch_builder.rs |  3 +-
 crates/fluss/src/row/encode/mod.rs                 | 14 ++---
 crates/fluss/src/rpc/server_connection.rs          | 65 +++++++++++++++-------
 crates/fluss/src/util/mod.rs                       |  2 +-
 crates/fluss/tests/integration/admin.rs            | 15 ++++-
 12 files changed, 136 insertions(+), 69 deletions(-)

diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index bffe0f5..286c46c 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -47,14 +47,15 @@ pub struct FlussAdmin {
 
 impl FlussAdmin {
     pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) -> 
Result<Self> {
-        let admin_con = connections
-            .get_connection(
-                metadata
-                    .get_cluster()
-                    .get_coordinator_server()
-                    .expect("Couldn't coordinator server"),
-            )
-            .await?;
+        let admin_con =
+            connections
+                
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
+                    || Error::UnexpectedError {
+                        message: "Coordinator server not found in cluster 
metadata".to_string(),
+                        source: None,
+                    },
+                )?)
+                .await?;
 
         Ok(FlussAdmin {
             admin_gateway: admin_con,
@@ -211,7 +212,7 @@ impl FlussAdmin {
         database_name: &str,
         ignore_if_not_exists: bool,
         cascade: bool,
-    ) {
+    ) -> Result<()> {
         let _response = self
             .admin_gateway
             .request(DropDatabaseRequest::new(
@@ -219,7 +220,8 @@ impl FlussAdmin {
                 ignore_if_not_exists,
                 cascade,
             ))
-            .await;
+            .await?;
+        Ok(())
     }
 
     /// List all databases
@@ -298,7 +300,7 @@ impl FlussAdmin {
         }
 
         let cluster = self.metadata.get_cluster();
-        let table_id = cluster.get_table(table_path).table_id;
+        let table_id = cluster.get_table(table_path)?.table_id;
 
         // Prepare requests
         let requests_by_server =
diff --git a/crates/fluss/src/client/connection.rs 
b/crates/fluss/src/client/connection.rs
index 0e41bbe..a19dbd2 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -68,19 +68,31 @@ impl FlussConnection {
     }
 
     pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {
+        // 1. Fast path: Attempt to acquire a read lock to check if the client 
already exists.
         if let Some(client) = self.writer_client.read().as_ref() {
             return Ok(client.clone());
         }
 
-        // If not exists, create new one
-        let client = Arc::new(WriterClient::new(self.args.clone(), 
self.metadata.clone())?);
-        *self.writer_client.write() = Some(client.clone());
-        Ok(client)
+        // 2. Slow path: Acquire the write lock.
+        let mut writer_guard = self.writer_client.write();
+
+        // 3. Double-check: Another thread might have initialized the client
+        // while this thread was waiting for the write lock.
+        if let Some(client) = writer_guard.as_ref() {
+            return Ok(client.clone());
+        }
+
+        // 4. Initialize the client since we are certain it doesn't exist yet.
+        let new_client = Arc::new(WriterClient::new(self.args.clone(), 
self.metadata.clone())?);
+
+        // 5. Store and return the newly created client.
+        *writer_guard = Some(new_client.clone());
+        Ok(new_client)
     }
 
     pub async fn get_table(&self, table_path: &TablePath) -> 
Result<FlussTable<'_>> {
         self.metadata.update_table_metadata(table_path).await?;
-        let table_info = 
self.metadata.get_cluster().get_table(table_path).clone();
+        let table_info = 
self.metadata.get_cluster().get_table(table_path)?.clone();
         if table_info.is_partitioned() {
             return Err(crate::error::Error::UnsupportedOperation {
                 message: "Partitioned tables are not supported".to_string(),
diff --git a/crates/fluss/src/client/metadata.rs 
b/crates/fluss/src/client/metadata.rs
index 3c6730b..614f6e7 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::cluster::{Cluster, ServerNode, ServerType};
-use crate::error::Result;
+use crate::error::{Error, Result};
 use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
 use crate::proto::MetadataResponse;
 use crate::rpc::message::UpdateMetadataRequest;
@@ -45,7 +45,12 @@ impl Metadata {
     }
 
     async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) -> 
Result<Cluster> {
-        let socket_address = boot_strap.parse::<SocketAddr>().unwrap();
+        let socket_address =
+            boot_strap
+                .parse::<SocketAddr>()
+                .map_err(|e| Error::IllegalArgument {
+                    message: format!("Invalid bootstrap address 
'{boot_strap}': {e}"),
+                })?;
         let server_node = ServerNode::new(
             -1,
             socket_address.ip().to_string(),
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 96114fb..624e7c4 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -96,9 +96,9 @@ impl RecordAccumulator {
         }
 
         let table_path = &record.table_path;
-        let table_info = cluster.get_table(table_path);
+        let table_info = cluster.get_table(table_path)?;
         let arrow_compression_info = 
table_info.get_table_config().get_arrow_compression_info()?;
-        let row_type = &cluster.get_table(table_path).row_type;
+        let row_type = &table_info.row_type;
 
         let schema_id = table_info.schema_id;
 
@@ -188,7 +188,7 @@ impl RecordAccumulator {
         self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
     }
 
-    pub async fn ready(&self, cluster: &Arc<Cluster>) -> ReadyCheckResult {
+    pub async fn ready(&self, cluster: &Arc<Cluster>) -> 
Result<ReadyCheckResult> {
         // Snapshot just the Arcs we need, avoiding cloning the entire 
BucketAndWriteBatches struct
         let entries: Vec<(TablePath, BucketBatches)> = self
             .write_batches
@@ -219,14 +219,14 @@ impl RecordAccumulator {
                     cluster,
                     next_ready_check_delay_ms,
                 )
-                .await
+                .await?
         }
 
-        ReadyCheckResult {
+        Ok(ReadyCheckResult {
             ready_nodes,
             next_ready_check_delay_ms,
             unknown_leader_tables,
-        }
+        })
     }
 
     async fn bucket_ready(
@@ -237,7 +237,7 @@ impl RecordAccumulator {
         unknown_leader_tables: &mut HashSet<TablePath>,
         cluster: &Cluster,
         next_ready_check_delay_ms: i64,
-    ) -> i64 {
+    ) -> Result<i64> {
         let mut next_delay = next_ready_check_delay_ms;
 
         for (bucket_id, batch) in bucket_batches {
@@ -250,7 +250,7 @@ impl RecordAccumulator {
             let waited_time_ms = batch.waited_time_ms(current_time_ms());
             let deque_size = batch_guard.len();
             let full = deque_size > 1 || batch.is_closed();
-            let table_bucket = cluster.get_table_bucket(table_path, bucket_id);
+            let table_bucket = cluster.get_table_bucket(table_path, 
bucket_id)?;
             if let Some(leader) = cluster.leader_for(&table_bucket) {
                 next_delay =
                     self.batch_ready(leader, waited_time_ms, full, 
ready_nodes, next_delay);
@@ -258,7 +258,7 @@ impl RecordAccumulator {
                 unknown_leader_tables.insert(table_path.clone());
             }
         }
-        next_delay
+        Ok(next_delay)
     }
 
     fn batch_ready(
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index 1ffda58..905ef80 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -78,7 +78,7 @@ impl Sender {
 
     async fn run_once(&self) -> Result<()> {
         let cluster = self.metadata.get_cluster();
-        let ready_check_result = self.accumulator.ready(&cluster).await;
+        let ready_check_result = self.accumulator.ready(&cluster).await?;
 
         // Update metadata if needed
         if !ready_check_result.unknown_leader_tables.is_empty() {
diff --git a/crates/fluss/src/cluster/cluster.rs 
b/crates/fluss/src/cluster/cluster.rs
index 2484026..0b14fe6 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -17,7 +17,7 @@
 
 use crate::BucketId;
 use crate::cluster::{BucketLocation, ServerNode, ServerType};
-use crate::error::Result;
+use crate::error::{Error, Result};
 use crate::metadata::{
     JsonSerde, PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, 
TablePath,
 };
@@ -188,7 +188,14 @@ impl Cluster {
             let table_id = table_metadata.table_id;
             let table_path = from_pb_table_path(&table_metadata.table_path);
             let table_descriptor = TableDescriptor::deserialize_json(
-                
&serde_json::from_slice(table_metadata.table_json.as_slice()).unwrap(),
+                
&serde_json::from_slice(table_metadata.table_json.as_slice()).map_err(|e| {
+                    Error::JsonSerdeError {
+                        message: format!(
+                            "Error deserializing table_json into 
TableDescriptor for table_id {} and table_path {}: {}",
+                            table_id, table_path, e
+                        )
+                    }
+                })?,
             )?;
             let table_info = TableInfo::of(
                 table_path.clone(),
@@ -261,9 +268,13 @@ impl Cluster {
         self.alive_tablet_servers_by_id.get(&id)
     }
 
-    pub fn get_table_bucket(&self, table_path: &TablePath, bucket_id: 
BucketId) -> TableBucket {
-        let table_info = self.get_table(table_path);
-        TableBucket::new(table_info.table_id, bucket_id)
+    pub fn get_table_bucket(
+        &self,
+        table_path: &TablePath,
+        bucket_id: BucketId,
+    ) -> Result<TableBucket> {
+        let table_info = self.get_table(table_path)?;
+        Ok(TableBucket::new(table_info.table_id, bucket_id))
     }
 
     pub fn get_bucket_locations_by_path(&self) -> &HashMap<TablePath, 
Vec<BucketLocation>> {
@@ -306,10 +317,12 @@ impl Cluster {
             .num_buckets
     }
 
-    pub fn get_table(&self, table_path: &TablePath) -> &TableInfo {
+    pub fn get_table(&self, table_path: &TablePath) -> Result<&TableInfo> {
         self.table_info_by_path
             .get(table_path)
-            .unwrap_or_else(|| panic!("can't find table info by path 
{table_path}"))
+            .ok_or_else(|| Error::InvalidTableError {
+                message: format!("Table info not found for {table_path}"),
+            })
     }
 
     pub fn opt_get_table(&self, table_path: &TablePath) -> Option<&TableInfo> {
diff --git a/crates/fluss/src/io/file_io.rs b/crates/fluss/src/io/file_io.rs
index e7b026d..adca333 100644
--- a/crates/fluss/src/io/file_io.rs
+++ b/crates/fluss/src/io/file_io.rs
@@ -39,8 +39,8 @@ pub struct FileIO {
 impl FileIO {
     /// Try to infer file io scheme from path.
     pub fn from_url(path: &str) -> Result<FileIOBuilder> {
-        let url = Url::parse(path).map_err(|_| Error::IllegalArgument {
-            message: format!("Invalid URL: {path}"),
+        let url = Url::parse(path).map_err(|e| Error::IllegalArgument {
+            message: format!("Invalid URL '{path}': {e}"),
         })?;
         Ok(FileIOBuilder::new(url.scheme()))
     }
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs 
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index 0b65500..8370764 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -29,6 +29,7 @@ use crate::record::kv::kv_record_batch::{
 };
 use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, 
NO_WRITER_ID};
 use bytes::{Bytes, BytesMut};
+use log::warn;
 use std::io;
 
 /// Builder for KvRecordBatch.
@@ -305,7 +306,7 @@ impl Drop for KvRecordBatchBuilder {
     fn drop(&mut self) {
         // Warn if the builder has records but was never built or was aborted
         if self.current_record_number > 0 && !self.aborted && 
self.built_buffer.is_none() {
-            eprintln!(
+            warn!(
                 "Warning: KvRecordBatchBuilder dropped with {} record(s) that 
were never built. \
                  Call build() to serialize the batch before dropping.",
                 self.current_record_number
diff --git a/crates/fluss/src/row/encode/mod.rs 
b/crates/fluss/src/row/encode/mod.rs
index 468d4d1..d5cf8ac 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -18,7 +18,7 @@
 mod compacted_key_encoder;
 mod compacted_row_encoder;
 
-use crate::error::Result;
+use crate::error::{Error, Result};
 use crate::metadata::{DataLakeFormat, KvFormat, RowType};
 use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
 use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
@@ -48,15 +48,15 @@ impl KeyEncoderFactory {
         data_lake_format: &Option<DataLakeFormat>,
     ) -> Result<Box<dyn KeyEncoder>> {
         match data_lake_format {
-            Some(DataLakeFormat::Paimon) => {
-                unimplemented!("KeyEncoder for Paimon format is currently 
unimplemented")
-            }
+            Some(DataLakeFormat::Paimon) => Err(Error::UnsupportedOperation {
+                message: "KeyEncoder for Paimon format is not yet 
implemented".to_string(),
+            }),
             Some(DataLakeFormat::Lance) => 
Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
                 row_type, key_fields,
             )?)),
-            Some(DataLakeFormat::Iceberg) => {
-                unimplemented!("KeyEncoder for Iceberg format is currently 
unimplemented")
-            }
+            Some(DataLakeFormat::Iceberg) => Err(Error::UnsupportedOperation {
+                message: "KeyEncoder for Iceberg format is not yet 
implemented".to_string(),
+            }),
             None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
                 row_type, key_fields,
             )?)),
diff --git a/crates/fluss/src/rpc/server_connection.rs 
b/crates/fluss/src/rpc/server_connection.rs
index 441b175..7504e2a 100644
--- a/crates/fluss/src/rpc/server_connection.rs
+++ b/crates/fluss/src/rpc/server_connection.rs
@@ -26,6 +26,7 @@ use crate::rpc::message::{
 };
 use crate::rpc::transport::Transport;
 use futures::future::BoxFuture;
+use log::warn;
 use parking_lot::{Mutex, RwLock};
 use std::collections::HashMap;
 use std::io::Cursor;
@@ -66,29 +67,25 @@ impl RpcClient {
         server_node: &ServerNode,
     ) -> Result<ServerConnection, RpcError> {
         let server_id = server_node.uid();
-        let connection = {
+        {
             let connections = self.connections.read();
-            connections.get(server_id).cloned()
-        };
-
-        if let Some(conn) = connection {
-            if !conn.is_poisoned() {
-                return Ok(conn);
+            if let Some(conn) = connections.get(server_id).cloned() {
+                if !conn.is_poisoned() {
+                    return Ok(conn);
+                }
             }
         }
-
-        let new_server = match self.connect(server_node).await {
-            Ok(new_server) => new_server,
-            Err(e) => {
-                self.connections.write().remove(server_id);
-                return Err(e);
+        let new_server = self.connect(server_node).await?;
+        {
+            let mut connections = self.connections.write();
+            if let Some(race_conn) = connections.get(server_id) {
+                if !race_conn.is_poisoned() {
+                    return Ok(race_conn.clone());
+                }
             }
-        };
-
-        self.connections
-            .write()
-            .insert(server_id.clone(), new_server.clone());
 
+            connections.insert(server_id.clone(), new_server.clone());
+        }
         Ok(new_server)
     }
 
@@ -253,7 +250,7 @@ where
         R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
         R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
     {
-        let request_id = self.request_id.fetch_add(1, Ordering::SeqCst);
+        let request_id = self.request_id.fetch_add(1, Ordering::SeqCst) & 
0x7FFFFFFF;
         let header = RequestHeader {
             request_api_key: R::API_KEY,
             request_api_version: ApiVersion(0),
@@ -290,7 +287,10 @@ where
 
         self.send_message(buf).await?;
         _cleanup_on_cancel.message_sent();
-        let mut response = rx.await.expect("Who closed this channel?!")?;
+        let mut response = rx.await.map_err(|e| Error::UnexpectedError {
+            message: "Got recvError, some one close the channel".to_string(),
+            source: Some(Box::new(e)),
+        })??;
 
         if let Some(error_response) = response.header.error_response {
             return Err(Error::FlussAPIError {
@@ -395,6 +395,31 @@ where
     }
 }
 
+impl<F> Drop for CancellationSafeFuture<F>
+where
+    F: Future + Send + 'static,
+{
+    fn drop(&mut self) {
+        // If the future hasn't finished yet, we must ensure it completes in 
the background.
+        // This prevents leaving half-sent messages on the wire if the caller 
cancels the request.
+        if let Some(fut) = self.inner.take() {
+            // Attempt to get a handle to the current Tokio runtime.
+            // This avoids a panic if the runtime has already shut down.
+            if let Ok(handle) = tokio::runtime::Handle::try_current() {
+                handle.spawn(async move {
+                    let _ = fut.await;
+                });
+            } else {
+                // Fallback: If no runtime is active, we cannot spawn.
+                // At this point, the future 'fut' will be dropped.
+                // Since the runtime is likely shutting down anyway,
+                // the underlying connection is probably being closed.
+                warn!("Tokio runtime not found during drop; background task 
cancelled.");
+            }
+        }
+    }
+}
+
 /// Helper that ensures that a request is removed when a request is cancelled 
before it was actually sent out.
 struct CleanupRequestStateOnCancel {
     state: Arc<Mutex<ConnectionState>>,
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index b987fe2..ee8dde4 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -30,7 +30,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
 pub fn current_time_ms() -> i64 {
     SystemTime::now()
         .duration_since(UNIX_EPOCH)
-        .unwrap()
+        .unwrap_or(std::time::Duration::ZERO)
         .as_millis() as i64
 }
 
diff --git a/crates/fluss/tests/integration/admin.rs 
b/crates/fluss/tests/integration/admin.rs
index 9842a5a..e94b67c 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -96,7 +96,10 @@ mod admin_test {
         assert_eq!(db_info.database_descriptor(), &db_descriptor);
 
         // drop database
-        admin.drop_database(db_name, false, true).await;
+        admin
+            .drop_database(db_name, false, true)
+            .await
+            .expect("should drop_database");
 
         // database shouldn't exist now
         assert_eq!(admin.database_exists(db_name).await.unwrap(), false);
@@ -218,7 +221,10 @@ mod admin_test {
         assert_eq!(admin.table_exists(&table_path).await.unwrap(), false);
 
         // drop database
-        admin.drop_database(test_db_name, false, true).await;
+        admin
+            .drop_database(test_db_name, false, true)
+            .await
+            .expect("Should drop database");
 
         // database shouldn't exist now
         assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
@@ -361,7 +367,10 @@ mod admin_test {
             .drop_table(&table_path, true)
             .await
             .expect("Failed to drop table");
-        admin.drop_database(test_db_name, true, true).await;
+        admin
+            .drop_database(test_db_name, true, true)
+            .await
+            .expect("Should drop database");
     }
 
     #[tokio::test]

Reply via email to