This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 2b9d1d8 chore: minor fix for rust code (#218)
2b9d1d8 is described below
commit 2b9d1d8c3238a7aaa3d7514d5caebdb661bd6214
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Jan 28 21:52:02 2026 +0800
chore: minor fix for rust code (#218)
---
crates/fluss/src/client/admin.rs | 24 ++++----
crates/fluss/src/client/connection.rs | 22 ++++++--
crates/fluss/src/client/metadata.rs | 9 ++-
crates/fluss/src/client/write/accumulator.rs | 18 +++---
crates/fluss/src/client/write/sender.rs | 2 +-
crates/fluss/src/cluster/cluster.rs | 27 ++++++---
crates/fluss/src/io/file_io.rs | 4 +-
.../fluss/src/record/kv/kv_record_batch_builder.rs | 3 +-
crates/fluss/src/row/encode/mod.rs | 14 ++---
crates/fluss/src/rpc/server_connection.rs | 65 +++++++++++++++-------
crates/fluss/src/util/mod.rs | 2 +-
crates/fluss/tests/integration/admin.rs | 15 ++++-
12 files changed, 136 insertions(+), 69 deletions(-)
diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs
index bffe0f5..286c46c 100644
--- a/crates/fluss/src/client/admin.rs
+++ b/crates/fluss/src/client/admin.rs
@@ -47,14 +47,15 @@ pub struct FlussAdmin {
impl FlussAdmin {
pub async fn new(connections: Arc<RpcClient>, metadata: Arc<Metadata>) ->
Result<Self> {
- let admin_con = connections
- .get_connection(
- metadata
- .get_cluster()
- .get_coordinator_server()
- .expect("Couldn't coordinator server"),
- )
- .await?;
+ let admin_con =
+ connections
+
.get_connection(metadata.get_cluster().get_coordinator_server().ok_or_else(
+ || Error::UnexpectedError {
+ message: "Coordinator server not found in cluster
metadata".to_string(),
+ source: None,
+ },
+ )?)
+ .await?;
Ok(FlussAdmin {
admin_gateway: admin_con,
@@ -211,7 +212,7 @@ impl FlussAdmin {
database_name: &str,
ignore_if_not_exists: bool,
cascade: bool,
- ) {
+ ) -> Result<()> {
let _response = self
.admin_gateway
.request(DropDatabaseRequest::new(
@@ -219,7 +220,8 @@ impl FlussAdmin {
ignore_if_not_exists,
cascade,
))
- .await;
+ .await?;
+ Ok(())
}
/// List all databases
@@ -298,7 +300,7 @@ impl FlussAdmin {
}
let cluster = self.metadata.get_cluster();
- let table_id = cluster.get_table(table_path).table_id;
+ let table_id = cluster.get_table(table_path)?.table_id;
// Prepare requests
let requests_by_server =
diff --git a/crates/fluss/src/client/connection.rs
b/crates/fluss/src/client/connection.rs
index 0e41bbe..a19dbd2 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -68,19 +68,31 @@ impl FlussConnection {
}
pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {
+ // 1. Fast path: Attempt to acquire a read lock to check if the client
already exists.
if let Some(client) = self.writer_client.read().as_ref() {
return Ok(client.clone());
}
- // If not exists, create new one
- let client = Arc::new(WriterClient::new(self.args.clone(),
self.metadata.clone())?);
- *self.writer_client.write() = Some(client.clone());
- Ok(client)
+ // 2. Slow path: Acquire the write lock.
+ let mut writer_guard = self.writer_client.write();
+
+ // 3. Double-check: Another thread might have initialized the client
+ // while this thread was waiting for the write lock.
+ if let Some(client) = writer_guard.as_ref() {
+ return Ok(client.clone());
+ }
+
+ // 4. Initialize the client since we are certain it doesn't exist yet.
+ let new_client = Arc::new(WriterClient::new(self.args.clone(),
self.metadata.clone())?);
+
+ // 5. Store and return the newly created client.
+ *writer_guard = Some(new_client.clone());
+ Ok(new_client)
}
pub async fn get_table(&self, table_path: &TablePath) ->
Result<FlussTable<'_>> {
self.metadata.update_table_metadata(table_path).await?;
- let table_info =
self.metadata.get_cluster().get_table(table_path).clone();
+ let table_info =
self.metadata.get_cluster().get_table(table_path)?.clone();
if table_info.is_partitioned() {
return Err(crate::error::Error::UnsupportedOperation {
message: "Partitioned tables are not supported".to_string(),
diff --git a/crates/fluss/src/client/metadata.rs
b/crates/fluss/src/client/metadata.rs
index 3c6730b..614f6e7 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::cluster::{Cluster, ServerNode, ServerType};
-use crate::error::Result;
+use crate::error::{Error, Result};
use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
use crate::proto::MetadataResponse;
use crate::rpc::message::UpdateMetadataRequest;
@@ -45,7 +45,12 @@ impl Metadata {
}
async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) ->
Result<Cluster> {
- let socket_address = boot_strap.parse::<SocketAddr>().unwrap();
+ let socket_address =
+ boot_strap
+ .parse::<SocketAddr>()
+ .map_err(|e| Error::IllegalArgument {
+ message: format!("Invalid bootstrap address
'{boot_strap}': {e}"),
+ })?;
let server_node = ServerNode::new(
-1,
socket_address.ip().to_string(),
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 96114fb..624e7c4 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -96,9 +96,9 @@ impl RecordAccumulator {
}
let table_path = &record.table_path;
- let table_info = cluster.get_table(table_path);
+ let table_info = cluster.get_table(table_path)?;
let arrow_compression_info =
table_info.get_table_config().get_arrow_compression_info()?;
- let row_type = &cluster.get_table(table_path).row_type;
+ let row_type = &table_info.row_type;
let schema_id = table_info.schema_id;
@@ -188,7 +188,7 @@ impl RecordAccumulator {
self.append_new_batch(cluster, record, bucket_id, &mut dq_guard)
}
- pub async fn ready(&self, cluster: &Arc<Cluster>) -> ReadyCheckResult {
+ pub async fn ready(&self, cluster: &Arc<Cluster>) ->
Result<ReadyCheckResult> {
// Snapshot just the Arcs we need, avoiding cloning the entire
BucketAndWriteBatches struct
let entries: Vec<(TablePath, BucketBatches)> = self
.write_batches
@@ -219,14 +219,14 @@ impl RecordAccumulator {
cluster,
next_ready_check_delay_ms,
)
- .await
+ .await?
}
- ReadyCheckResult {
+ Ok(ReadyCheckResult {
ready_nodes,
next_ready_check_delay_ms,
unknown_leader_tables,
- }
+ })
}
async fn bucket_ready(
@@ -237,7 +237,7 @@ impl RecordAccumulator {
unknown_leader_tables: &mut HashSet<TablePath>,
cluster: &Cluster,
next_ready_check_delay_ms: i64,
- ) -> i64 {
+ ) -> Result<i64> {
let mut next_delay = next_ready_check_delay_ms;
for (bucket_id, batch) in bucket_batches {
@@ -250,7 +250,7 @@ impl RecordAccumulator {
let waited_time_ms = batch.waited_time_ms(current_time_ms());
let deque_size = batch_guard.len();
let full = deque_size > 1 || batch.is_closed();
- let table_bucket = cluster.get_table_bucket(table_path, bucket_id);
+ let table_bucket = cluster.get_table_bucket(table_path,
bucket_id)?;
if let Some(leader) = cluster.leader_for(&table_bucket) {
next_delay =
self.batch_ready(leader, waited_time_ms, full,
ready_nodes, next_delay);
@@ -258,7 +258,7 @@ impl RecordAccumulator {
unknown_leader_tables.insert(table_path.clone());
}
}
- next_delay
+ Ok(next_delay)
}
fn batch_ready(
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index 1ffda58..905ef80 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -78,7 +78,7 @@ impl Sender {
async fn run_once(&self) -> Result<()> {
let cluster = self.metadata.get_cluster();
- let ready_check_result = self.accumulator.ready(&cluster).await;
+ let ready_check_result = self.accumulator.ready(&cluster).await?;
// Update metadata if needed
if !ready_check_result.unknown_leader_tables.is_empty() {
diff --git a/crates/fluss/src/cluster/cluster.rs
b/crates/fluss/src/cluster/cluster.rs
index 2484026..0b14fe6 100644
--- a/crates/fluss/src/cluster/cluster.rs
+++ b/crates/fluss/src/cluster/cluster.rs
@@ -17,7 +17,7 @@
use crate::BucketId;
use crate::cluster::{BucketLocation, ServerNode, ServerType};
-use crate::error::Result;
+use crate::error::{Error, Result};
use crate::metadata::{
JsonSerde, PhysicalTablePath, TableBucket, TableDescriptor, TableInfo,
TablePath,
};
@@ -188,7 +188,14 @@ impl Cluster {
let table_id = table_metadata.table_id;
let table_path = from_pb_table_path(&table_metadata.table_path);
let table_descriptor = TableDescriptor::deserialize_json(
-
&serde_json::from_slice(table_metadata.table_json.as_slice()).unwrap(),
+
&serde_json::from_slice(table_metadata.table_json.as_slice()).map_err(|e| {
+ Error::JsonSerdeError {
+ message: format!(
+ "Error deserializing table_json into
TableDescriptor for table_id {} and table_path {}: {}",
+ table_id, table_path, e
+ )
+ }
+ })?,
)?;
let table_info = TableInfo::of(
table_path.clone(),
@@ -261,9 +268,13 @@ impl Cluster {
self.alive_tablet_servers_by_id.get(&id)
}
- pub fn get_table_bucket(&self, table_path: &TablePath, bucket_id:
BucketId) -> TableBucket {
- let table_info = self.get_table(table_path);
- TableBucket::new(table_info.table_id, bucket_id)
+ pub fn get_table_bucket(
+ &self,
+ table_path: &TablePath,
+ bucket_id: BucketId,
+ ) -> Result<TableBucket> {
+ let table_info = self.get_table(table_path)?;
+ Ok(TableBucket::new(table_info.table_id, bucket_id))
}
pub fn get_bucket_locations_by_path(&self) -> &HashMap<TablePath,
Vec<BucketLocation>> {
@@ -306,10 +317,12 @@ impl Cluster {
.num_buckets
}
- pub fn get_table(&self, table_path: &TablePath) -> &TableInfo {
+ pub fn get_table(&self, table_path: &TablePath) -> Result<&TableInfo> {
self.table_info_by_path
.get(table_path)
- .unwrap_or_else(|| panic!("can't find table info by path
{table_path}"))
+ .ok_or_else(|| Error::InvalidTableError {
+ message: format!("Table info not found for {table_path}"),
+ })
}
pub fn opt_get_table(&self, table_path: &TablePath) -> Option<&TableInfo> {
diff --git a/crates/fluss/src/io/file_io.rs b/crates/fluss/src/io/file_io.rs
index e7b026d..adca333 100644
--- a/crates/fluss/src/io/file_io.rs
+++ b/crates/fluss/src/io/file_io.rs
@@ -39,8 +39,8 @@ pub struct FileIO {
impl FileIO {
/// Try to infer file io scheme from path.
pub fn from_url(path: &str) -> Result<FileIOBuilder> {
- let url = Url::parse(path).map_err(|_| Error::IllegalArgument {
- message: format!("Invalid URL: {path}"),
+ let url = Url::parse(path).map_err(|e| Error::IllegalArgument {
+ message: format!("Invalid URL '{path}': {e}"),
})?;
Ok(FileIOBuilder::new(url.scheme()))
}
diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
index 0b65500..8370764 100644
--- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs
+++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs
@@ -29,6 +29,7 @@ use crate::record::kv::kv_record_batch::{
};
use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE,
NO_WRITER_ID};
use bytes::{Bytes, BytesMut};
+use log::warn;
use std::io;
/// Builder for KvRecordBatch.
@@ -305,7 +306,7 @@ impl Drop for KvRecordBatchBuilder {
fn drop(&mut self) {
// Warn if the builder has records but was never built or was aborted
if self.current_record_number > 0 && !self.aborted &&
self.built_buffer.is_none() {
- eprintln!(
+ warn!(
"Warning: KvRecordBatchBuilder dropped with {} record(s) that
were never built. \
Call build() to serialize the batch before dropping.",
self.current_record_number
diff --git a/crates/fluss/src/row/encode/mod.rs
b/crates/fluss/src/row/encode/mod.rs
index 468d4d1..d5cf8ac 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -18,7 +18,7 @@
mod compacted_key_encoder;
mod compacted_row_encoder;
-use crate::error::Result;
+use crate::error::{Error, Result};
use crate::metadata::{DataLakeFormat, KvFormat, RowType};
use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
@@ -48,15 +48,15 @@ impl KeyEncoderFactory {
data_lake_format: &Option<DataLakeFormat>,
) -> Result<Box<dyn KeyEncoder>> {
match data_lake_format {
- Some(DataLakeFormat::Paimon) => {
- unimplemented!("KeyEncoder for Paimon format is currently
unimplemented")
- }
+ Some(DataLakeFormat::Paimon) => Err(Error::UnsupportedOperation {
+ message: "KeyEncoder for Paimon format is not yet
implemented".to_string(),
+ }),
Some(DataLakeFormat::Lance) =>
Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
row_type, key_fields,
)?)),
- Some(DataLakeFormat::Iceberg) => {
- unimplemented!("KeyEncoder for Iceberg format is currently
unimplemented")
- }
+ Some(DataLakeFormat::Iceberg) => Err(Error::UnsupportedOperation {
+ message: "KeyEncoder for Iceberg format is not yet
implemented".to_string(),
+ }),
None => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
row_type, key_fields,
)?)),
diff --git a/crates/fluss/src/rpc/server_connection.rs
b/crates/fluss/src/rpc/server_connection.rs
index 441b175..7504e2a 100644
--- a/crates/fluss/src/rpc/server_connection.rs
+++ b/crates/fluss/src/rpc/server_connection.rs
@@ -26,6 +26,7 @@ use crate::rpc::message::{
};
use crate::rpc::transport::Transport;
use futures::future::BoxFuture;
+use log::warn;
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::io::Cursor;
@@ -66,29 +67,25 @@ impl RpcClient {
server_node: &ServerNode,
) -> Result<ServerConnection, RpcError> {
let server_id = server_node.uid();
- let connection = {
+ {
let connections = self.connections.read();
- connections.get(server_id).cloned()
- };
-
- if let Some(conn) = connection {
- if !conn.is_poisoned() {
- return Ok(conn);
+ if let Some(conn) = connections.get(server_id).cloned() {
+ if !conn.is_poisoned() {
+ return Ok(conn);
+ }
}
}
-
- let new_server = match self.connect(server_node).await {
- Ok(new_server) => new_server,
- Err(e) => {
- self.connections.write().remove(server_id);
- return Err(e);
+ let new_server = self.connect(server_node).await?;
+ {
+ let mut connections = self.connections.write();
+ if let Some(race_conn) = connections.get(server_id) {
+ if !race_conn.is_poisoned() {
+ return Ok(race_conn.clone());
+ }
}
- };
-
- self.connections
- .write()
- .insert(server_id.clone(), new_server.clone());
+ connections.insert(server_id.clone(), new_server.clone());
+ }
Ok(new_server)
}
@@ -253,7 +250,7 @@ where
R: RequestBody + Send + WriteVersionedType<Vec<u8>>,
R::ResponseBody: ReadVersionedType<Cursor<Vec<u8>>>,
{
- let request_id = self.request_id.fetch_add(1, Ordering::SeqCst);
+ let request_id = self.request_id.fetch_add(1, Ordering::SeqCst) &
0x7FFFFFFF;
let header = RequestHeader {
request_api_key: R::API_KEY,
request_api_version: ApiVersion(0),
@@ -290,7 +287,10 @@ where
self.send_message(buf).await?;
_cleanup_on_cancel.message_sent();
- let mut response = rx.await.expect("Who closed this channel?!")?;
+ let mut response = rx.await.map_err(|e| Error::UnexpectedError {
+ message: "Got recvError, some one close the channel".to_string(),
+ source: Some(Box::new(e)),
+ })??;
if let Some(error_response) = response.header.error_response {
return Err(Error::FlussAPIError {
@@ -395,6 +395,31 @@ where
}
}
+impl<F> Drop for CancellationSafeFuture<F>
+where
+ F: Future + Send + 'static,
+{
+ fn drop(&mut self) {
+ // If the future hasn't finished yet, we must ensure it completes in
the background.
+ // This prevents leaving half-sent messages on the wire if the caller
cancels the request.
+ if let Some(fut) = self.inner.take() {
+ // Attempt to get a handle to the current Tokio runtime.
+ // This avoids a panic if the runtime has already shut down.
+ if let Ok(handle) = tokio::runtime::Handle::try_current() {
+ handle.spawn(async move {
+ let _ = fut.await;
+ });
+ } else {
+ // Fallback: If no runtime is active, we cannot spawn.
+ // At this point, the future 'fut' will be dropped.
+ // Since the runtime is likely shutting down anyway,
+ // the underlying connection is probably being closed.
+ warn!("Tokio runtime not found during drop; background task
cancelled.");
+ }
+ }
+ }
+}
+
/// Helper that ensures that a request is removed when a request is cancelled
before it was actually sent out.
struct CleanupRequestStateOnCancel {
state: Arc<Mutex<ConnectionState>>,
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index b987fe2..ee8dde4 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -30,7 +30,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
pub fn current_time_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
- .unwrap()
+ .unwrap_or(std::time::Duration::ZERO)
.as_millis() as i64
}
diff --git a/crates/fluss/tests/integration/admin.rs
b/crates/fluss/tests/integration/admin.rs
index 9842a5a..e94b67c 100644
--- a/crates/fluss/tests/integration/admin.rs
+++ b/crates/fluss/tests/integration/admin.rs
@@ -96,7 +96,10 @@ mod admin_test {
assert_eq!(db_info.database_descriptor(), &db_descriptor);
// drop database
- admin.drop_database(db_name, false, true).await;
+ admin
+ .drop_database(db_name, false, true)
+ .await
+ .expect("should drop_database");
// database shouldn't exist now
assert_eq!(admin.database_exists(db_name).await.unwrap(), false);
@@ -218,7 +221,10 @@ mod admin_test {
assert_eq!(admin.table_exists(&table_path).await.unwrap(), false);
// drop database
- admin.drop_database(test_db_name, false, true).await;
+ admin
+ .drop_database(test_db_name, false, true)
+ .await
+ .expect("Should drop database");
// database shouldn't exist now
assert_eq!(admin.database_exists(test_db_name).await.unwrap(), false);
@@ -361,7 +367,10 @@ mod admin_test {
.drop_table(&table_path, true)
.await
.expect("Failed to drop table");
- admin.drop_database(test_db_name, true, true).await;
+ admin
+ .drop_database(test_db_name, true, true)
+ .await
+ .expect("Should drop database");
}
#[tokio::test]