This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new fee26ee  chore: make batch timeout configurable (#371)
fee26ee is described below

commit fee26ee88acd8eaeb4585e611d0dbdaf8f66ea6d
Author: Prajwal banakar <[email protected]>
AuthorDate: Fri Feb 27 09:28:50 2026 +0530

    chore: make batch timeout configurable (#371)
---
 bindings/cpp/include/fluss.hpp                        |  1 +
 bindings/cpp/src/ffi_converter.hpp                    |  1 +
 bindings/cpp/src/lib.rs                               |  2 ++
 bindings/python/src/config.rs                         | 17 +++++++++++++++++
 crates/fluss/src/client/write/accumulator.rs          |  3 ++-
 crates/fluss/src/config.rs                            |  7 +++++++
 website/docs/user-guide/cpp/example/configuration.md  |  1 +
 .../docs/user-guide/python/example/configuration.md   | 19 ++++++++++---------
 website/docs/user-guide/rust/example/configuration.md |  1 +
 9 files changed, 42 insertions(+), 10 deletions(-)

diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index f17cafc..0f980b7 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -987,6 +987,7 @@ struct Configuration {
     size_t remote_file_download_thread_num{3};
     // Maximum number of records returned in a single call to Poll() for 
LogScanner
     size_t scanner_log_max_poll_records{500};
+    int64_t writer_batch_timeout_ms{100};
 };
 
 class Connection {
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index a2e7fa2..e376343 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -55,6 +55,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& 
config) {
     ffi_config.scanner_remote_log_prefetch_num = 
config.scanner_remote_log_prefetch_num;
     ffi_config.remote_file_download_thread_num = 
config.remote_file_download_thread_num;
     ffi_config.scanner_log_max_poll_records = 
config.scanner_log_max_poll_records;
+    ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
     return ffi_config;
 }
 
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 32dbf7d..ea1307e 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -47,6 +47,7 @@ mod ffi {
         scanner_remote_log_prefetch_num: usize,
         remote_file_download_thread_num: usize,
         scanner_log_max_poll_records: usize,
+        writer_batch_timeout_ms: i64,
     }
 
     struct FfiResult {
@@ -623,6 +624,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut 
Connection, String> {
         writer_acks: config.writer_acks.to_string(),
         writer_retries: config.writer_retries,
         writer_batch_size: config.writer_batch_size,
+        writer_batch_timeout_ms: config.writer_batch_timeout_ms,
         writer_bucket_no_key_assigner: assigner_type,
         scanner_remote_log_prefetch_num: 
config.scanner_remote_log_prefetch_num,
         remote_file_download_thread_num: 
config.remote_file_download_thread_num,
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 75056a5..5b7f2d3 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -60,6 +60,11 @@ impl Config {
                             FlussError::new_err(format!("Invalid value 
'{value}' for '{key}': {e}"))
                         })?;
                     }
+                    "writer.batch-timeout-ms" => {
+                        config.writer_batch_timeout_ms = 
value.parse::<i64>().map_err(|e| {
+                            FlussError::new_err(format!("Invalid value 
'{value}' for '{key}': {e}"))
+                        })?;
+                    }
                     "scanner.remote-log.prefetch-num" => {
                         config.scanner_remote_log_prefetch_num =
                             value.parse::<usize>().map_err(|e| {
@@ -200,6 +205,18 @@ impl Config {
     fn set_scanner_log_max_poll_records(&mut self, num: usize) {
         self.inner.scanner_log_max_poll_records = num;
     }
+
+    /// Get the writer batch timeout in milliseconds
+    #[getter]
+    fn writer_batch_timeout_ms(&self) -> i64 {
+        self.inner.writer_batch_timeout_ms
+    }
+
+    /// Set the writer batch timeout in milliseconds
+    #[setter]
+    fn set_writer_batch_timeout_ms(&mut self, timeout: i64) {
+        self.inner.writer_batch_timeout_ms = timeout;
+    }
 }
 
 impl Config {
diff --git a/crates/fluss/src/client/write/accumulator.rs 
b/crates/fluss/src/client/write/accumulator.rs
index 2c36452..0cf501c 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -50,11 +50,12 @@ pub struct RecordAccumulator {
 
 impl RecordAccumulator {
     pub fn new(config: Config) -> Self {
+        let batch_timeout_ms = config.writer_batch_timeout_ms;
         RecordAccumulator {
             config,
             write_batches: Default::default(),
             incomplete_batches: Default::default(),
-            batch_timeout_ms: 500,
+            batch_timeout_ms,
             closed: Default::default(),
             flushes_in_progress: Default::default(),
             appends_in_progress: Default::default(),
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 6ff4327..3026493 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -26,6 +26,7 @@ const DEFAULT_RETRIES: i32 = i32::MAX;
 const DEFAULT_PREFETCH_NUM: usize = 4;
 const DEFAULT_DOWNLOAD_THREADS: usize = 3;
 const DEFAULT_MAX_POLL_RECORDS: usize = 500;
+const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
 
 const DEFAULT_ACKS: &str = "all";
 
@@ -84,6 +85,11 @@ pub struct Config {
     /// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS)
     #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
     pub scanner_log_max_poll_records: usize,
+
+    /// The maximum time to wait for a batch to be completed in milliseconds.
+    /// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT)
+    #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)]
+    pub writer_batch_timeout_ms: i64,
 }
 
 impl Default for Config {
@@ -98,6 +104,7 @@ impl Default for Config {
             scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
             remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
             scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
+            writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
         }
     }
 }
diff --git a/website/docs/user-guide/cpp/example/configuration.md 
b/website/docs/user-guide/cpp/example/configuration.md
index 715e3c6..d73661a 100644
--- a/website/docs/user-guide/cpp/example/configuration.md
+++ b/website/docs/user-guide/cpp/example/configuration.md
@@ -30,6 +30,7 @@ config.writer_request_max_size = 10 * 1024 * 1024;     // Max 
request size (10 M
 config.writer_acks = "all";                      // Wait for all replicas
 config.writer_retries = std::numeric_limits<int32_t>::max();  // Retry on 
failure
 config.writer_batch_size = 2 * 1024 * 1024;     // Batch size (2 MB)
+config.writer_batch_timeout_ms = 100;           // Max time to wait for a 
batch to fill
 config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
 config.scanner_remote_log_prefetch_num = 4;      // Remote log prefetch count
 config.remote_file_download_thread_num = 3;  // Download threads
diff --git a/website/docs/user-guide/python/example/configuration.md 
b/website/docs/user-guide/python/example/configuration.md
index 466bf0d..71e7199 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -23,15 +23,16 @@ with await fluss.FlussConnection.create(config) as conn:
 
 | Key                                | Description                             
                                             | Default            |
 
|------------------------------------|--------------------------------------------------------------------------------------|--------------------|
-| `bootstrap.servers`               | Coordinator server address               
                                             | `127.0.0.1:9123`   |
-| `writer.request-max-size`         | Maximum request size in bytes            
                                             | `10485760` (10 MB) |
-| `writer.acks`                     | Acknowledgment setting (`all` waits for 
all replicas)                                 | `all`              |
-| `writer.retries`                  | Number of retries on failure             
                                             | `2147483647`       |
-| `writer.batch-size`               | Batch size for writes in bytes           
                                             | `2097152` (2 MB)   |
-| `writer.bucket.no-key-assigner`   | Bucket assignment strategy for tables 
without bucket keys: `sticky` or `round_robin`  | `sticky`           |
-| `scanner.remote-log.prefetch-num` | Number of remote log segments to 
prefetch                                             | `4`                |
-| `remote-file.download-thread-num` | Number of threads for remote log 
downloads                                            | `3`                |
-| `scanner.log.max-poll-records`    | Max records returned in a single poll()  
                                             | `500`              |
+| `bootstrap.servers`               | Coordinator server address               
                                            | `127.0.0.1:9123`   |
+| `writer.request-max-size`         | Maximum request size in bytes            
                                            | `10485760` (10 MB) |
+| `writer.acks`                     | Acknowledgment setting (`all` waits for 
all replicas)                                | `all`              |
+| `writer.retries`                  | Number of retries on failure             
                                            | `2147483647`       |
+| `writer.batch-size`               | Batch size for writes in bytes           
                                            | `2097152` (2 MB)   |
+| `writer.bucket.no-key-assigner`   | Bucket assignment strategy for tables 
without bucket keys: `sticky` or `round_robin` | `sticky`           |
+| `scanner.remote-log.prefetch-num` | Number of remote log segments to 
prefetch                                            | `4`                |
+| `remote-file.download-thread-num` | Number of threads for remote log 
downloads                                           | `3`                |
+| `scanner.log.max-poll-records`    | Max records returned in a single poll()  
                                            | `500`              |
+| `writer.batch-timeout-ms`         | The maximum time to wait for a writer 
batch to fill up before sending.               | `100`              |
 
 Remember to close the connection when done:
 
diff --git a/website/docs/user-guide/rust/example/configuration.md 
b/website/docs/user-guide/rust/example/configuration.md
index 92b9bf2..7d7cc93 100644
--- a/website/docs/user-guide/rust/example/configuration.md
+++ b/website/docs/user-guide/rust/example/configuration.md
@@ -25,3 +25,4 @@ let conn = FlussConnection::new(config).await?;
 | `writer_retries`                | Number of retries on failure               
                                          | `i32::MAX`       |
 | `writer_batch_size`             | Batch size for writes                      
                                          | 2 MB             |
 | `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables 
without bucket keys: `sticky` or `round_robin` | `sticky`         |
+| `writer_batch_timeout_ms`       | The maximum time to wait for a writer 
batch to fill up before sending.               | `100`            |

Reply via email to