This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 094ff3e  feat: Add round robin bucket assigner (#360)
094ff3e is described below

commit 094ff3e10a65aa3cec4342969b10178c509e57bf
Author: Kaiqi Dong <[email protected]>
AuthorDate: Wed Feb 25 02:05:48 2026 +0100

    feat: Add round robin bucket assigner (#360)
---
 bindings/cpp/include/fluss.hpp                     |  2 +
 bindings/cpp/src/ffi_converter.hpp                 |  1 +
 bindings/cpp/src/lib.rs                            | 11 ++++
 bindings/python/src/config.rs                      | 11 ++++
 crates/fluss/src/client/write/bucket_assigner.rs   | 66 ++++++++++++++++++++++
 crates/fluss/src/client/write/writer_client.rs     | 30 +++++++---
 crates/fluss/src/config.rs                         | 27 ++++++++-
 .../docs/user-guide/cpp/example/configuration.md   |  1 +
 .../user-guide/python/example/configuration.md     | 21 +++----
 .../docs/user-guide/rust/example/configuration.md  | 15 ++---
 10 files changed, 160 insertions(+), 25 deletions(-)

diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 6b56ba2..f17cafc 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -979,6 +979,8 @@ struct Configuration {
     int32_t writer_retries{std::numeric_limits<int32_t>::max()};
     // Writer batch size in bytes (2 MB)
     int32_t writer_batch_size{2 * 1024 * 1024};
+    // Bucket assigner for tables without bucket keys: "sticky" or 
"round_robin"
+    std::string writer_bucket_no_key_assigner{"sticky"};
     // Number of remote log batches to prefetch during scanning
     size_t scanner_remote_log_prefetch_num{4};
     // Number of threads for downloading remote log data
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index 370429b..a2e7fa2 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -51,6 +51,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& 
config) {
     ffi_config.writer_acks = rust::String(config.writer_acks);
     ffi_config.writer_retries = config.writer_retries;
     ffi_config.writer_batch_size = config.writer_batch_size;
+    ffi_config.writer_bucket_no_key_assigner = 
rust::String(config.writer_bucket_no_key_assigner);
     ffi_config.scanner_remote_log_prefetch_num = 
config.scanner_remote_log_prefetch_num;
     ffi_config.remote_file_download_thread_num = 
config.remote_file_download_thread_num;
     ffi_config.scanner_log_max_poll_records = 
config.scanner_log_max_poll_records;
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 5a1b3db..9fbdc8f 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -43,6 +43,7 @@ mod ffi {
         writer_acks: String,
         writer_retries: i32,
         writer_batch_size: i32,
+        writer_bucket_no_key_assigner: String,
         scanner_remote_log_prefetch_num: usize,
         remote_file_download_thread_num: usize,
         scanner_log_max_poll_records: usize,
@@ -607,12 +608,22 @@ fn err_from_core_error(e: &fcore::error::Error) -> 
ffi::FfiResult {
 
 // Connection implementation
 fn new_connection(config: &ffi::FfiConfig) -> Result<*mut Connection, String> {
+    let assigner_type = match config.writer_bucket_no_key_assigner.as_str() {
+        "round_robin" => fluss::config::NoKeyAssigner::RoundRobin,
+        "sticky" => fluss::config::NoKeyAssigner::Sticky,
+        other => {
+            return Err(format!(
+                "Unknown bucket assigner type: '{other}', expected 'sticky' or 
'round_robin'"
+            ));
+        }
+    };
     let config = fluss::config::Config {
         bootstrap_servers: config.bootstrap_servers.to_string(),
         writer_request_max_size: config.writer_request_max_size,
         writer_acks: config.writer_acks.to_string(),
         writer_retries: config.writer_retries,
         writer_batch_size: config.writer_batch_size,
+        writer_bucket_no_key_assigner: assigner_type,
         scanner_remote_log_prefetch_num: 
config.scanner_remote_log_prefetch_num,
         remote_file_download_thread_num: 
config.remote_file_download_thread_num,
         scanner_log_max_poll_records: config.scanner_log_max_poll_records,
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index fdf90b7..75056a5 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -84,6 +84,17 @@ impl Config {
                                 ))
                             })?;
                     }
+                    "writer.bucket.no-key-assigner" => {
+                        config.writer_bucket_no_key_assigner = match 
value.as_str() {
+                            "round_robin" => 
fcore::config::NoKeyAssigner::RoundRobin,
+                            "sticky" => fcore::config::NoKeyAssigner::Sticky,
+                            other => {
+                                return Err(FlussError::new_err(format!(
+                                    "Unknown bucket assigner type: {other}, 
expected 'sticky' or 'round_robin'"
+                                )));
+                            }
+                        };
+                    }
                     _ => {
                         return Err(FlussError::new_err(format!("Unknown 
property: {key}")));
                     }
diff --git a/crates/fluss/src/client/write/bucket_assigner.rs 
b/crates/fluss/src/client/write/bucket_assigner.rs
index 7fcd20b..8ad38e3 100644
--- a/crates/fluss/src/client/write/bucket_assigner.rs
+++ b/crates/fluss/src/client/write/bucket_assigner.rs
@@ -106,6 +106,44 @@ impl BucketAssigner for StickyBucketAssigner {
     }
 }
 
+/// Unlike [StickyBucketAssigner], each record is assigned to the next bucket
+/// in a rotating sequence, providing even data distribution across all 
buckets.
+pub struct RoundRobinBucketAssigner {
+    table_path: Arc<PhysicalTablePath>,
+    num_buckets: i32,
+    counter: AtomicI32,
+}
+
+impl RoundRobinBucketAssigner {
+    pub fn new(table_path: Arc<PhysicalTablePath>, num_buckets: i32) -> Self {
+        let mut rng = rand::rng();
+        Self {
+            table_path,
+            num_buckets,
+            counter: AtomicI32::new(rng.random()),
+        }
+    }
+}
+
+impl BucketAssigner for RoundRobinBucketAssigner {
+    fn abort_if_batch_full(&self) -> bool {
+        false
+    }
+
+    fn on_new_batch(&self, _cluster: &Cluster, _prev_bucket_id: i32) {}
+
+    fn assign_bucket(&self, _bucket_key: Option<&Bytes>, cluster: &Cluster) -> 
Result<i32> {
+        let next_value = self.counter.fetch_add(1, Ordering::Relaxed);
+        let available_buckets = 
cluster.get_available_buckets_for_table_path(&self.table_path);
+        if available_buckets.is_empty() {
+            Ok((next_value & i32::MAX) % self.num_buckets)
+        } else {
+            let idx = (next_value & i32::MAX) % available_buckets.len() as i32;
+            Ok(available_buckets[idx as usize].bucket_id())
+        }
+    }
+}
+
 /// A [BucketAssigner] which assigns based on a modulo hashing function
 pub struct HashBucketAssigner {
     num_buckets: i32,
@@ -173,6 +211,34 @@ mod tests {
         assert!((0..2).contains(&next_bucket));
     }
 
+    #[test]
+    fn round_robin_assigner_cycles_through_buckets() {
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let num_buckets = 3;
+        let cluster = build_cluster(&table_path, 1, num_buckets);
+        let physical = Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
+        let assigner = RoundRobinBucketAssigner::new(physical, num_buckets);
+
+        let mut seen = Vec::new();
+        for _ in 0..(num_buckets * 2) {
+            let bucket = assigner.assign_bucket(None, 
&cluster).expect("bucket");
+            assert!((0..num_buckets).contains(&bucket));
+            seen.push(bucket);
+        }
+
+        assert_eq!(seen[0], seen[3]);
+        assert_eq!(seen[1], seen[4]);
+        assert_eq!(seen[2], seen[5]);
+    }
+
+    #[test]
+    fn round_robin_assigner_does_not_abort_on_batch_full() {
+        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
+        let physical = Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
+        let assigner = RoundRobinBucketAssigner::new(physical, 3);
+        assert!(!assigner.abort_if_batch_full());
+    }
+
     #[test]
     fn hash_bucket_assigner_requires_key() {
         let assigner = HashBucketAssigner::new(3, <dyn 
BucketingFunction>::of(None));
diff --git a/crates/fluss/src/client/write/writer_client.rs 
b/crates/fluss/src/client/write/writer_client.rs
index 41ef4bb..23f523c 100644
--- a/crates/fluss/src/client/write/writer_client.rs
+++ b/crates/fluss/src/client/write/writer_client.rs
@@ -19,11 +19,12 @@ use crate::BucketId;
 use crate::bucketing::BucketingFunction;
 use crate::client::metadata::Metadata;
 use crate::client::write::bucket_assigner::{
-    BucketAssigner, HashBucketAssigner, StickyBucketAssigner,
+    BucketAssigner, HashBucketAssigner, RoundRobinBucketAssigner, 
StickyBucketAssigner,
 };
 use crate::client::write::sender::Sender;
 use crate::client::{RecordAccumulator, ResultHandle, WriteRecord};
 use crate::config::Config;
+use crate::config::NoKeyAssigner;
 use crate::error::{Error, Result};
 use crate::metadata::{PhysicalTablePath, TableInfo};
 use bytes::Bytes;
@@ -99,7 +100,12 @@ impl WriterClient {
         let (bucket_assigner, bucket_id) =
             self.assign_bucket(&record.table_info, bucket_key, 
physical_table_path)?;
 
-        let mut result = self.accumulate.append(record, bucket_id, &cluster, 
true)?;
+        let mut result = self.accumulate.append(
+            record,
+            bucket_id,
+            &cluster,
+            bucket_assigner.abort_if_batch_full(),
+        )?;
 
         if result.abort_record_for_new_batch {
             let prev_bucket_id = bucket_id;
@@ -125,10 +131,14 @@ impl WriterClient {
             if let Some(assigner) = self.bucket_assigners.get(table_path) {
                 assigner.clone()
             } else {
-                let assigner =
-                    Self::create_bucket_assigner(table_info, 
Arc::clone(table_path), bucket_key)?;
+                let assigner = Self::create_bucket_assigner(
+                    table_info,
+                    Arc::clone(table_path),
+                    bucket_key,
+                    &self.config,
+                )?;
                 self.bucket_assigners
-                    .insert(Arc::clone(table_path), 
Arc::clone(&assigner.clone()));
+                    .insert(Arc::clone(table_path), Arc::clone(&assigner));
                 assigner
             }
         };
@@ -164,6 +174,7 @@ impl WriterClient {
         table_info: &Arc<TableInfo>,
         table_path: Arc<PhysicalTablePath>,
         bucket_key: Option<&Bytes>,
+        config: &Config,
     ) -> Result<Arc<dyn BucketAssigner>> {
         if bucket_key.is_some() {
             let datalake_format = 
table_info.get_table_config().get_datalake_format()?;
@@ -173,8 +184,13 @@ impl WriterClient {
                 function,
             )))
         } else {
-            // TODO: Wire up toi use round robin/sticky according to 
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER
-            Ok(Arc::new(StickyBucketAssigner::new(table_path)))
+            match config.writer_bucket_no_key_assigner {
+                NoKeyAssigner::Sticky => 
Ok(Arc::new(StickyBucketAssigner::new(table_path))),
+                NoKeyAssigner::RoundRobin => 
Ok(Arc::new(RoundRobinBucketAssigner::new(
+                    table_path,
+                    table_info.num_buckets,
+                ))),
+            }
         }
     }
 }
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index ecf7e12..6ff4327 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -15,8 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use clap::Parser;
+use clap::{Parser, ValueEnum};
 use serde::{Deserialize, Serialize};
+use std::fmt;
 
 const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123";
 const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024;
@@ -28,6 +29,26 @@ const DEFAULT_MAX_POLL_RECORDS: usize = 500;
 
 const DEFAULT_ACKS: &str = "all";
 
+/// Bucket assigner strategy for tables without bucket keys.
+/// Matches Java `client.writer.bucket.no-key-assigner`.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Deserialize, Serialize)]
+#[serde(rename_all = "snake_case")]
+pub enum NoKeyAssigner {
+    /// Sticks to one bucket until the batch is full, then switches.
+    Sticky,
+    /// Assigns each record to the next bucket in a rotating sequence.
+    RoundRobin,
+}
+
+impl fmt::Display for NoKeyAssigner {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            NoKeyAssigner::Sticky => write!(f, "sticky"),
+            NoKeyAssigner::RoundRobin => write!(f, "round_robin"),
+        }
+    }
+}
+
 #[derive(Parser, Debug, Clone, Deserialize, Serialize)]
 #[command(author, version, about, long_about = None)]
 pub struct Config {
@@ -46,6 +67,9 @@ pub struct Config {
     #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE)]
     pub writer_batch_size: i32,
 
+    #[arg(long, value_enum, default_value_t = NoKeyAssigner::Sticky)]
+    pub writer_bucket_no_key_assigner: NoKeyAssigner,
+
     /// Maximum number of remote log segments to prefetch
     /// Default: 4 (matching Java CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM)
     #[arg(long, default_value_t = DEFAULT_PREFETCH_NUM)]
@@ -70,6 +94,7 @@ impl Default for Config {
             writer_acks: String::from(DEFAULT_ACKS),
             writer_retries: i32::MAX,
             writer_batch_size: DEFAULT_WRITER_BATCH_SIZE,
+            writer_bucket_no_key_assigner: NoKeyAssigner::Sticky,
             scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
             remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
             scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
diff --git a/website/docs/user-guide/cpp/example/configuration.md 
b/website/docs/user-guide/cpp/example/configuration.md
index 518a584..715e3c6 100644
--- a/website/docs/user-guide/cpp/example/configuration.md
+++ b/website/docs/user-guide/cpp/example/configuration.md
@@ -30,6 +30,7 @@ config.writer_request_max_size = 10 * 1024 * 1024;     // Max 
request size (10 M
 config.writer_acks = "all";                      // Wait for all replicas
 config.writer_retries = std::numeric_limits<int32_t>::max();  // Retry on 
failure
 config.writer_batch_size = 2 * 1024 * 1024;     // Batch size (2 MB)
+config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
 config.scanner_remote_log_prefetch_num = 4;      // Remote log prefetch count
 config.remote_file_download_thread_num = 3;  // Download threads
 ```
diff --git a/website/docs/user-guide/python/example/configuration.md 
b/website/docs/user-guide/python/example/configuration.md
index f2828c4..466bf0d 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -21,16 +21,17 @@ with await fluss.FlussConnection.create(config) as conn:
 
 ## Connection Configurations
 
-| Key                 | Description                                           
| Default            |
-|---------------------|-------------------------------------------------------|--------------------|
-| `bootstrap.servers` | Coordinator server address                            
| `127.0.0.1:9123`   |
-| `writer.request-max-size`  | Maximum request size in bytes                  
| `10485760` (10 MB) |
-| `writer.acks`       | Acknowledgment setting (`all` waits for all replicas) 
| `all`              |
-| `writer.retries`    | Number of retries on failure                          
| `2147483647`       |
-| `writer.batch-size` | Batch size for writes in bytes                        
| `2097152` (2 MB)   |
-| `scanner.remote-log.prefetch-num` | Number of remote log segments to 
prefetch | `4`                |
-| `remote-file.download-thread-num` | Number of threads for remote log 
downloads | `3`               |
-| `scanner.log.max-poll-records` | Max records returned in a single poll()     
  | `500`              |
+| Key                                | Description                             
                                             | Default            |
+|------------------------------------|--------------------------------------------------------------------------------------|--------------------|
+| `bootstrap.servers`               | Coordinator server address               
                                             | `127.0.0.1:9123`   |
+| `writer.request-max-size`         | Maximum request size in bytes            
                                             | `10485760` (10 MB) |
+| `writer.acks`                     | Acknowledgment setting (`all` waits for 
all replicas)                                 | `all`              |
+| `writer.retries`                  | Number of retries on failure             
                                             | `2147483647`       |
+| `writer.batch-size`               | Batch size for writes in bytes           
                                             | `2097152` (2 MB)   |
+| `writer.bucket.no-key-assigner`   | Bucket assignment strategy for tables 
without bucket keys: `sticky` or `round_robin`  | `sticky`           |
+| `scanner.remote-log.prefetch-num` | Number of remote log segments to 
prefetch                                             | `4`                |
+| `remote-file.download-thread-num` | Number of threads for remote log 
downloads                                            | `3`                |
+| `scanner.log.max-poll-records`    | Max records returned in a single poll()  
                                             | `500`              |
 
 Remember to close the connection when done:
 
diff --git a/website/docs/user-guide/rust/example/configuration.md 
b/website/docs/user-guide/rust/example/configuration.md
index a84c24d..92b9bf2 100644
--- a/website/docs/user-guide/rust/example/configuration.md
+++ b/website/docs/user-guide/rust/example/configuration.md
@@ -17,10 +17,11 @@ let conn = FlussConnection::new(config).await?;
 
 ## Connection Configurations
 
-| Option                    | Description                                      
     | Default          |
-|---------------------------|-------------------------------------------------------|------------------|
-| `bootstrap_servers`       | Coordinator server address                       
     | `127.0.0.1:9123` |
-| `writer_request_max_size` | Maximum request size in bytes                    
     | 10 MB            |
-| `writer_acks`             | Acknowledgment setting (`all` waits for all 
replicas) | `all`            |
-| `writer_retries`          | Number of retries on failure                     
     | `i32::MAX`       |
-| `writer_batch_size`       | Batch size for writes                            
     | 2 MB             |
+| Option                          | Description                                
                                          | Default          |
+|---------------------------------|--------------------------------------------------------------------------------------|------------------|
+| `bootstrap_servers`             | Coordinator server address                 
                                          | `127.0.0.1:9123` |
+| `writer_request_max_size`       | Maximum request size in bytes              
                                          | 10 MB            |
+| `writer_acks`                   | Acknowledgment setting (`all` waits for 
all replicas)                                | `all`            |
+| `writer_retries`                | Number of retries on failure               
                                          | `i32::MAX`       |
+| `writer_batch_size`             | Batch size for writes                      
                                          | 2 MB             |
+| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables 
without bucket keys: `sticky` or `round_robin` | `sticky`         |

Reply via email to