This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new fdfde8d chore: allow propagating hostname as raw bootstrap address
(#307)
fdfde8d is described below
commit fdfde8d902751316d080230d24a0f223fe392612
Author: Junfan Zhang <[email protected]>
AuthorDate: Sat Feb 14 08:09:30 2026 +0800
chore: allow propagating hostname as raw bootstrap address (#307)
---
crates/fluss/src/client/metadata.rs | 61 ++++++++++++++++++++++++++++++++-----
1 file changed, 54 insertions(+), 7 deletions(-)
diff --git a/crates/fluss/src/client/metadata.rs
b/crates/fluss/src/client/metadata.rs
index ce00ced..3d8e77b 100644
--- a/crates/fluss/src/client/metadata.rs
+++ b/crates/fluss/src/client/metadata.rs
@@ -24,7 +24,7 @@ use crate::rpc::{RpcClient, ServerConnection};
use log::info;
use parking_lot::RwLock;
use std::collections::HashSet;
-use std::net::SocketAddr;
+use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
#[derive(Default)]
@@ -44,13 +44,33 @@ impl Metadata {
})
}
+ fn parse_bootstrap(boot_strap: &str) -> Result<SocketAddr> {
+ // Resolve all socket addresses and deterministically choose one.
+ let addrs = boot_strap
+ .to_socket_addrs()
+ .map_err(|e| Error::IllegalArgument {
+ message: format!("Invalid bootstrap address '{boot_strap}':
{e}"),
+ })?;
+
+ // Prefer IPv4 addresses; if none are available, fall back to the
first IPv6.
+ let mut ipv6_candidate: Option<SocketAddr> = None;
+ for addr in addrs {
+ if addr.is_ipv4() {
+ return Ok(addr);
+ }
+ if ipv6_candidate.is_none() {
+ ipv6_candidate = Some(addr);
+ }
+ }
+
+ let addr = ipv6_candidate.ok_or_else(|| Error::IllegalArgument {
+ message: format!("Unable to resolve bootstrap address
'{boot_strap}'"),
+ })?;
+ Ok(addr)
+ }
+
async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) ->
Result<Cluster> {
- let socket_address =
- boot_strap
- .parse::<SocketAddr>()
- .map_err(|e| Error::IllegalArgument {
- message: format!("Invalid bootstrap address
'{boot_strap}': {e}"),
- })?;
+ let socket_address = Self::parse_bootstrap(boot_strap)?;
let server_node = ServerNode::new(
-1,
socket_address.ip().to_string(),
@@ -273,4 +293,31 @@ mod tests {
let cluster = metadata.get_cluster();
assert!(cluster.get_tablet_server(1).is_none());
}
+
+ #[test]
+ fn parse_bootstrap_variants() {
+ // valid IP
+ let addr = Metadata::parse_bootstrap("127.0.0.1:8080").unwrap();
+ assert_eq!(addr.port(), 8080);
+
+ // valid hostname
+ let addr = Metadata::parse_bootstrap("localhost:9090").unwrap();
+ assert_eq!(addr.port(), 9090);
+
+ // valid IPv6 address
+ let addr = Metadata::parse_bootstrap("[::1]:8080").unwrap();
+ assert_eq!(addr.port(), 8080);
+
+ // invalid input: missing port
+ assert!(Metadata::parse_bootstrap("localhost").is_err());
+
+ // invalid input: out-of-range port
+ assert!(Metadata::parse_bootstrap("localhost:99999").is_err());
+
+ // invalid input: empty string
+ assert!(Metadata::parse_bootstrap("").is_err());
+
+ // invalid input: nonsensical address
+ assert!(Metadata::parse_bootstrap("invalid_address").is_err());
+ }
}