This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 094ff3e feat: Add round robin bucket assigner (#360)
094ff3e is described below
commit 094ff3e10a65aa3cec4342969b10178c509e57bf
Author: Kaiqi Dong <[email protected]>
AuthorDate: Wed Feb 25 02:05:48 2026 +0100
feat: Add round robin bucket assigner (#360)
---
bindings/cpp/include/fluss.hpp | 2 +
bindings/cpp/src/ffi_converter.hpp | 1 +
bindings/cpp/src/lib.rs | 11 ++++
bindings/python/src/config.rs | 11 ++++
crates/fluss/src/client/write/bucket_assigner.rs | 66 ++++++++++++++++++++++
crates/fluss/src/client/write/writer_client.rs | 30 +++++++---
crates/fluss/src/config.rs | 27 ++++++++-
.../docs/user-guide/cpp/example/configuration.md | 1 +
.../user-guide/python/example/configuration.md | 21 +++----
.../docs/user-guide/rust/example/configuration.md | 15 ++---
10 files changed, 160 insertions(+), 25 deletions(-)
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 6b56ba2..f17cafc 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -979,6 +979,8 @@ struct Configuration {
int32_t writer_retries{std::numeric_limits<int32_t>::max()};
// Writer batch size in bytes (2 MB)
int32_t writer_batch_size{2 * 1024 * 1024};
+ // Bucket assigner for tables without bucket keys: "sticky" or
"round_robin"
+ std::string writer_bucket_no_key_assigner{"sticky"};
// Number of remote log batches to prefetch during scanning
size_t scanner_remote_log_prefetch_num{4};
// Number of threads for downloading remote log data
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index 370429b..a2e7fa2 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -51,6 +51,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration&
config) {
ffi_config.writer_acks = rust::String(config.writer_acks);
ffi_config.writer_retries = config.writer_retries;
ffi_config.writer_batch_size = config.writer_batch_size;
+ ffi_config.writer_bucket_no_key_assigner =
rust::String(config.writer_bucket_no_key_assigner);
ffi_config.scanner_remote_log_prefetch_num =
config.scanner_remote_log_prefetch_num;
ffi_config.remote_file_download_thread_num =
config.remote_file_download_thread_num;
ffi_config.scanner_log_max_poll_records =
config.scanner_log_max_poll_records;
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 5a1b3db..9fbdc8f 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -43,6 +43,7 @@ mod ffi {
writer_acks: String,
writer_retries: i32,
writer_batch_size: i32,
+ writer_bucket_no_key_assigner: String,
scanner_remote_log_prefetch_num: usize,
remote_file_download_thread_num: usize,
scanner_log_max_poll_records: usize,
@@ -607,12 +608,22 @@ fn err_from_core_error(e: &fcore::error::Error) ->
ffi::FfiResult {
// Connection implementation
fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
+ let assigner_type = match config.writer_bucket_no_key_assigner.as_str() {
+ "round_robin" => fluss::config::NoKeyAssigner::RoundRobin,
+ "sticky" => fluss::config::NoKeyAssigner::Sticky,
+ other => {
+ return Err(format!(
+ "Unknown bucket assigner type: '{other}', expected 'sticky' or
'round_robin'"
+ ));
+ }
+ };
let config = fluss::config::Config {
bootstrap_servers: config.bootstrap_servers.to_string(),
writer_request_max_size: config.writer_request_max_size,
writer_acks: config.writer_acks.to_string(),
writer_retries: config.writer_retries,
writer_batch_size: config.writer_batch_size,
+ writer_bucket_no_key_assigner: assigner_type,
scanner_remote_log_prefetch_num:
config.scanner_remote_log_prefetch_num,
remote_file_download_thread_num:
config.remote_file_download_thread_num,
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index fdf90b7..75056a5 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -84,6 +84,17 @@ impl Config {
))
})?;
}
+ "writer.bucket.no-key-assigner" => {
+ config.writer_bucket_no_key_assigner = match
value.as_str() {
+ "round_robin" =>
fcore::config::NoKeyAssigner::RoundRobin,
+ "sticky" => fcore::config::NoKeyAssigner::Sticky,
+ other => {
+ return Err(FlussError::new_err(format!(
+ "Unknown bucket assigner type: {other},
expected 'sticky' or 'round_robin'"
+ )));
+ }
+ };
+ }
_ => {
return Err(FlussError::new_err(format!("Unknown
property: {key}")));
}
diff --git a/crates/fluss/src/client/write/bucket_assigner.rs
b/crates/fluss/src/client/write/bucket_assigner.rs
index 7fcd20b..8ad38e3 100644
--- a/crates/fluss/src/client/write/bucket_assigner.rs
+++ b/crates/fluss/src/client/write/bucket_assigner.rs
@@ -106,6 +106,44 @@ impl BucketAssigner for StickyBucketAssigner {
}
}
+/// Unlike [StickyBucketAssigner], each record is assigned to the next bucket
+/// in a rotating sequence, providing even data distribution across all
buckets.
+pub struct RoundRobinBucketAssigner {
+ table_path: Arc<PhysicalTablePath>,
+ num_buckets: i32,
+ counter: AtomicI32,
+}
+
+impl RoundRobinBucketAssigner {
+ pub fn new(table_path: Arc<PhysicalTablePath>, num_buckets: i32) -> Self {
+ let mut rng = rand::rng();
+ Self {
+ table_path,
+ num_buckets,
+ counter: AtomicI32::new(rng.random()),
+ }
+ }
+}
+
+impl BucketAssigner for RoundRobinBucketAssigner {
+ fn abort_if_batch_full(&self) -> bool {
+ false
+ }
+
+ fn on_new_batch(&self, _cluster: &Cluster, _prev_bucket_id: i32) {}
+
+ fn assign_bucket(&self, _bucket_key: Option<&Bytes>, cluster: &Cluster) ->
Result<i32> {
+ let next_value = self.counter.fetch_add(1, Ordering::Relaxed);
+ let available_buckets =
cluster.get_available_buckets_for_table_path(&self.table_path);
+ if available_buckets.is_empty() {
+ Ok((next_value & i32::MAX) % self.num_buckets)
+ } else {
+ let idx = (next_value & i32::MAX) % available_buckets.len() as i32;
+ Ok(available_buckets[idx as usize].bucket_id())
+ }
+ }
+}
+
/// A [BucketAssigner] which assigns based on a modulo hashing function
pub struct HashBucketAssigner {
num_buckets: i32,
@@ -173,6 +211,34 @@ mod tests {
assert!((0..2).contains(&next_bucket));
}
+ #[test]
+ fn round_robin_assigner_cycles_through_buckets() {
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let num_buckets = 3;
+ let cluster = build_cluster(&table_path, 1, num_buckets);
+ let physical = Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
+ let assigner = RoundRobinBucketAssigner::new(physical, num_buckets);
+
+ let mut seen = Vec::new();
+ for _ in 0..(num_buckets * 2) {
+ let bucket = assigner.assign_bucket(None,
&cluster).expect("bucket");
+ assert!((0..num_buckets).contains(&bucket));
+ seen.push(bucket);
+ }
+
+ assert_eq!(seen[0], seen[3]);
+ assert_eq!(seen[1], seen[4]);
+ assert_eq!(seen[2], seen[5]);
+ }
+
+ #[test]
+ fn round_robin_assigner_does_not_abort_on_batch_full() {
+ let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+ let physical = Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
+ let assigner = RoundRobinBucketAssigner::new(physical, 3);
+ assert!(!assigner.abort_if_batch_full());
+ }
+
#[test]
fn hash_bucket_assigner_requires_key() {
let assigner = HashBucketAssigner::new(3, <dyn
BucketingFunction>::of(None));
diff --git a/crates/fluss/src/client/write/writer_client.rs
b/crates/fluss/src/client/write/writer_client.rs
index 41ef4bb..23f523c 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -19,11 +19,12 @@ use crate::BucketId;
use crate::bucketing::BucketingFunction;
use crate::client::metadata::Metadata;
use crate::client::write::bucket_assigner::{
- BucketAssigner, HashBucketAssigner, StickyBucketAssigner,
+ BucketAssigner, HashBucketAssigner, RoundRobinBucketAssigner,
StickyBucketAssigner,
};
use crate::client::write::sender::Sender;
use crate::client::{RecordAccumulator, ResultHandle, WriteRecord};
use crate::config::Config;
+use crate::config::NoKeyAssigner;
use crate::error::{Error, Result};
use crate::metadata::{PhysicalTablePath, TableInfo};
use bytes::Bytes;
@@ -99,7 +100,12 @@ impl WriterClient {
let (bucket_assigner, bucket_id) =
self.assign_bucket(&record.table_info, bucket_key,
physical_table_path)?;
- let mut result = self.accumulate.append(record, bucket_id, &cluster,
true)?;
+ let mut result = self.accumulate.append(
+ record,
+ bucket_id,
+ &cluster,
+ bucket_assigner.abort_if_batch_full(),
+ )?;
if result.abort_record_for_new_batch {
let prev_bucket_id = bucket_id;
@@ -125,10 +131,14 @@ impl WriterClient {
if let Some(assigner) = self.bucket_assigners.get(table_path) {
assigner.clone()
} else {
- let assigner =
- Self::create_bucket_assigner(table_info,
Arc::clone(table_path), bucket_key)?;
+ let assigner = Self::create_bucket_assigner(
+ table_info,
+ Arc::clone(table_path),
+ bucket_key,
+ &self.config,
+ )?;
self.bucket_assigners
- .insert(Arc::clone(table_path),
Arc::clone(&assigner.clone()));
+ .insert(Arc::clone(table_path), Arc::clone(&assigner));
assigner
}
};
@@ -164,6 +174,7 @@ impl WriterClient {
table_info: &Arc<TableInfo>,
table_path: Arc<PhysicalTablePath>,
bucket_key: Option<&Bytes>,
+ config: &Config,
) -> Result<Arc<dyn BucketAssigner>> {
if bucket_key.is_some() {
let datalake_format =
table_info.get_table_config().get_datalake_format()?;
@@ -173,8 +184,13 @@ impl WriterClient {
function,
)))
} else {
- // TODO: Wire up toi use round robin/sticky according to
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER
- Ok(Arc::new(StickyBucketAssigner::new(table_path)))
+ match config.writer_bucket_no_key_assigner {
+ NoKeyAssigner::Sticky =>
Ok(Arc::new(StickyBucketAssigner::new(table_path))),
+ NoKeyAssigner::RoundRobin =>
Ok(Arc::new(RoundRobinBucketAssigner::new(
+ table_path,
+ table_info.num_buckets,
+ ))),
+ }
}
}
}
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index ecf7e12..6ff4327 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use clap::Parser;
+use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize};
+use std::fmt;
const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123";
const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024;
@@ -28,6 +29,26 @@ const DEFAULT_MAX_POLL_RECORDS: usize = 500;
const DEFAULT_ACKS: &str = "all";
+/// Bucket assigner strategy for tables without bucket keys.
+/// Matches Java `client.writer.bucket.no-key-assigner`.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Deserialize, Serialize)]
+#[serde(rename_all = "snake_case")]
+pub enum NoKeyAssigner {
+ /// Sticks to one bucket until the batch is full, then switches.
+ Sticky,
+ /// Assigns each record to the next bucket in a rotating sequence.
+ RoundRobin,
+}
+
+impl fmt::Display for NoKeyAssigner {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ NoKeyAssigner::Sticky => write!(f, "sticky"),
+ NoKeyAssigner::RoundRobin => write!(f, "round_robin"),
+ }
+ }
+}
+
#[derive(Parser, Debug, Clone, Deserialize, Serialize)]
#[command(author, version, about, long_about = None)]
pub struct Config {
@@ -46,6 +67,9 @@ pub struct Config {
#[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE)]
pub writer_batch_size: i32,
+ #[arg(long, value_enum, default_value_t = NoKeyAssigner::Sticky)]
+ pub writer_bucket_no_key_assigner: NoKeyAssigner,
+
/// Maximum number of remote log segments to prefetch
/// Default: 4 (matching Java CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM)
#[arg(long, default_value_t = DEFAULT_PREFETCH_NUM)]
@@ -70,6 +94,7 @@ impl Default for Config {
writer_acks: String::from(DEFAULT_ACKS),
writer_retries: i32::MAX,
writer_batch_size: DEFAULT_WRITER_BATCH_SIZE,
+ writer_bucket_no_key_assigner: NoKeyAssigner::Sticky,
scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
diff --git a/website/docs/user-guide/cpp/example/configuration.md
b/website/docs/user-guide/cpp/example/configuration.md
index 518a584..715e3c6 100644
--- a/website/docs/user-guide/cpp/example/configuration.md
+++ b/website/docs/user-guide/cpp/example/configuration.md
@@ -30,6 +30,7 @@ config.writer_request_max_size = 10 * 1024 * 1024; // Max
request size (10 M
config.writer_acks = "all"; // Wait for all replicas
config.writer_retries = std::numeric_limits<int32_t>::max(); // Retry on
failure
config.writer_batch_size = 2 * 1024 * 1024; // Batch size (2 MB)
+config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count
config.remote_file_download_thread_num = 3; // Download threads
```
diff --git a/website/docs/user-guide/python/example/configuration.md
b/website/docs/user-guide/python/example/configuration.md
index f2828c4..466bf0d 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -21,16 +21,17 @@ with await fluss.FlussConnection.create(config) as conn:
## Connection Configurations
-| Key | Description
| Default |
-|---------------------|-------------------------------------------------------|--------------------|
-| `bootstrap.servers` | Coordinator server address
| `127.0.0.1:9123` |
-| `writer.request-max-size` | Maximum request size in bytes
| `10485760` (10 MB) |
-| `writer.acks` | Acknowledgment setting (`all` waits for all replicas)
| `all` |
-| `writer.retries` | Number of retries on failure
| `2147483647` |
-| `writer.batch-size` | Batch size for writes in bytes
| `2097152` (2 MB) |
-| `scanner.remote-log.prefetch-num` | Number of remote log segments to
prefetch | `4` |
-| `remote-file.download-thread-num` | Number of threads for remote log
downloads | `3` |
-| `scanner.log.max-poll-records` | Max records returned in a single poll()
| `500` |
+| Key | Description
| Default |
+|------------------------------------|--------------------------------------------------------------------------------------|--------------------|
+| `bootstrap.servers` | Coordinator server address
| `127.0.0.1:9123` |
+| `writer.request-max-size` | Maximum request size in bytes
| `10485760` (10 MB) |
+| `writer.acks` | Acknowledgment setting (`all` waits for
all replicas) | `all` |
+| `writer.retries` | Number of retries on failure
| `2147483647` |
+| `writer.batch-size` | Batch size for writes in bytes
| `2097152` (2 MB) |
+| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables
without bucket keys: `sticky` or `round_robin` | `sticky` |
+| `scanner.remote-log.prefetch-num` | Number of remote log segments to
prefetch | `4` |
+| `remote-file.download-thread-num` | Number of threads for remote log
downloads | `3` |
+| `scanner.log.max-poll-records` | Max records returned in a single poll()
| `500` |
Remember to close the connection when done:
diff --git a/website/docs/user-guide/rust/example/configuration.md
b/website/docs/user-guide/rust/example/configuration.md
index a84c24d..92b9bf2 100644
--- a/website/docs/user-guide/rust/example/configuration.md
+++ b/website/docs/user-guide/rust/example/configuration.md
@@ -17,10 +17,11 @@ let conn = FlussConnection::new(config).await?;
## Connection Configurations
-| Option | Description
| Default |
-|---------------------------|-------------------------------------------------------|------------------|
-| `bootstrap_servers` | Coordinator server address
| `127.0.0.1:9123` |
-| `writer_request_max_size` | Maximum request size in bytes
| 10 MB |
-| `writer_acks` | Acknowledgment setting (`all` waits for all
replicas) | `all` |
-| `writer_retries` | Number of retries on failure
| `i32::MAX` |
-| `writer_batch_size` | Batch size for writes
| 2 MB |
+| Option | Description
| Default |
+|---------------------------------|--------------------------------------------------------------------------------------|------------------|
+| `bootstrap_servers` | Coordinator server address
| `127.0.0.1:9123` |
+| `writer_request_max_size` | Maximum request size in bytes
| 10 MB |
+| `writer_acks` | Acknowledgment setting (`all` waits for
all replicas) | `all` |
+| `writer_retries` | Number of retries on failure
| `i32::MAX` |
+| `writer_batch_size` | Batch size for writes
| 2 MB |
+| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables
without bucket keys: `sticky` or `round_robin` | `sticky` |