This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new fee26ee chore: make batch timeout configurable (#371)
fee26ee is described below
commit fee26ee88acd8eaeb4585e611d0dbdaf8f66ea6d
Author: Prajwal banakar <[email protected]>
AuthorDate: Fri Feb 27 09:28:50 2026 +0530
chore: make batch timeout configurable (#371)
---
bindings/cpp/include/fluss.hpp | 1 +
bindings/cpp/src/ffi_converter.hpp | 1 +
bindings/cpp/src/lib.rs | 2 ++
bindings/python/src/config.rs | 17 +++++++++++++++++
crates/fluss/src/client/write/accumulator.rs | 3 ++-
crates/fluss/src/config.rs | 7 +++++++
website/docs/user-guide/cpp/example/configuration.md | 1 +
.../docs/user-guide/python/example/configuration.md | 19 ++++++++++---------
website/docs/user-guide/rust/example/configuration.md | 1 +
9 files changed, 42 insertions(+), 10 deletions(-)
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index f17cafc..0f980b7 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -987,6 +987,7 @@ struct Configuration {
size_t remote_file_download_thread_num{3};
// Maximum number of records returned in a single call to Poll() for
LogScanner
size_t scanner_log_max_poll_records{500};
+ int64_t writer_batch_timeout_ms{100};
};
class Connection {
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index a2e7fa2..e376343 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -55,6 +55,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration&
config) {
ffi_config.scanner_remote_log_prefetch_num =
config.scanner_remote_log_prefetch_num;
ffi_config.remote_file_download_thread_num =
config.remote_file_download_thread_num;
ffi_config.scanner_log_max_poll_records =
config.scanner_log_max_poll_records;
+ ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
return ffi_config;
}
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 32dbf7d..ea1307e 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -47,6 +47,7 @@ mod ffi {
scanner_remote_log_prefetch_num: usize,
remote_file_download_thread_num: usize,
scanner_log_max_poll_records: usize,
+ writer_batch_timeout_ms: i64,
}
struct FfiResult {
@@ -623,6 +624,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut
Connection, String> {
writer_acks: config.writer_acks.to_string(),
writer_retries: config.writer_retries,
writer_batch_size: config.writer_batch_size,
+ writer_batch_timeout_ms: config.writer_batch_timeout_ms,
writer_bucket_no_key_assigner: assigner_type,
scanner_remote_log_prefetch_num:
config.scanner_remote_log_prefetch_num,
remote_file_download_thread_num:
config.remote_file_download_thread_num,
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 75056a5..5b7f2d3 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -60,6 +60,11 @@ impl Config {
FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
})?;
}
+ "writer.batch-timeout-ms" => {
+ config.writer_batch_timeout_ms =
value.parse::<i64>().map_err(|e| {
+ FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
+ })?;
+ }
"scanner.remote-log.prefetch-num" => {
config.scanner_remote_log_prefetch_num =
value.parse::<usize>().map_err(|e| {
@@ -200,6 +205,18 @@ impl Config {
fn set_scanner_log_max_poll_records(&mut self, num: usize) {
self.inner.scanner_log_max_poll_records = num;
}
+
+ /// Get the writer batch timeout in milliseconds
+ #[getter]
+ fn writer_batch_timeout_ms(&self) -> i64 {
+ self.inner.writer_batch_timeout_ms
+ }
+
+ /// Set the writer batch timeout in milliseconds
+ #[setter]
+ fn set_writer_batch_timeout_ms(&mut self, timeout: i64) {
+ self.inner.writer_batch_timeout_ms = timeout;
+ }
}
impl Config {
diff --git a/crates/fluss/src/client/write/accumulator.rs
b/crates/fluss/src/client/write/accumulator.rs
index 2c36452..0cf501c 100644
--- a/crates/fluss/src/client/write/accumulator.rs
+++ b/crates/fluss/src/client/write/accumulator.rs
@@ -50,11 +50,12 @@ pub struct RecordAccumulator {
impl RecordAccumulator {
pub fn new(config: Config) -> Self {
+ let batch_timeout_ms = config.writer_batch_timeout_ms;
RecordAccumulator {
config,
write_batches: Default::default(),
incomplete_batches: Default::default(),
- batch_timeout_ms: 500,
+ batch_timeout_ms,
closed: Default::default(),
flushes_in_progress: Default::default(),
appends_in_progress: Default::default(),
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 6ff4327..3026493 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -26,6 +26,7 @@ const DEFAULT_RETRIES: i32 = i32::MAX;
const DEFAULT_PREFETCH_NUM: usize = 4;
const DEFAULT_DOWNLOAD_THREADS: usize = 3;
const DEFAULT_MAX_POLL_RECORDS: usize = 500;
+const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
const DEFAULT_ACKS: &str = "all";
@@ -84,6 +85,11 @@ pub struct Config {
/// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS)
#[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
pub scanner_log_max_poll_records: usize,
+
+ /// The maximum time to wait for a batch to be completed in milliseconds.
+ /// Default: 100 (matching Java CLIENT_WRITER_BATCH_TIMEOUT)
+ #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_TIMEOUT_MS)]
+ pub writer_batch_timeout_ms: i64,
}
impl Default for Config {
@@ -98,6 +104,7 @@ impl Default for Config {
scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
+ writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
}
}
}
diff --git a/website/docs/user-guide/cpp/example/configuration.md
b/website/docs/user-guide/cpp/example/configuration.md
index 715e3c6..d73661a 100644
--- a/website/docs/user-guide/cpp/example/configuration.md
+++ b/website/docs/user-guide/cpp/example/configuration.md
@@ -30,6 +30,7 @@ config.writer_request_max_size = 10 * 1024 * 1024; // Max
request size (10 M
config.writer_acks = "all"; // Wait for all replicas
config.writer_retries = std::numeric_limits<int32_t>::max(); // Retry on
failure
config.writer_batch_size = 2 * 1024 * 1024; // Batch size (2 MB)
+config.writer_batch_timeout_ms = 100; // Max time to wait for a
batch to fill
config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count
config.remote_file_download_thread_num = 3; // Download threads
diff --git a/website/docs/user-guide/python/example/configuration.md
b/website/docs/user-guide/python/example/configuration.md
index 466bf0d..71e7199 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -23,15 +23,16 @@ with await fluss.FlussConnection.create(config) as conn:
| Key | Description
| Default |
|------------------------------------|--------------------------------------------------------------------------------------|--------------------|
-| `bootstrap.servers` | Coordinator server address
| `127.0.0.1:9123` |
-| `writer.request-max-size` | Maximum request size in bytes
| `10485760` (10 MB) |
-| `writer.acks` | Acknowledgment setting (`all` waits for
all replicas) | `all` |
-| `writer.retries` | Number of retries on failure
| `2147483647` |
-| `writer.batch-size` | Batch size for writes in bytes
| `2097152` (2 MB) |
-| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables
without bucket keys: `sticky` or `round_robin` | `sticky` |
-| `scanner.remote-log.prefetch-num` | Number of remote log segments to
prefetch | `4` |
-| `remote-file.download-thread-num` | Number of threads for remote log
downloads | `3` |
-| `scanner.log.max-poll-records` | Max records returned in a single poll()
| `500` |
+| `bootstrap.servers` | Coordinator server address
| `127.0.0.1:9123` |
+| `writer.request-max-size` | Maximum request size in bytes
| `10485760` (10 MB) |
+| `writer.acks` | Acknowledgment setting (`all` waits for
all replicas) | `all` |
+| `writer.retries` | Number of retries on failure
| `2147483647` |
+| `writer.batch-size` | Batch size for writes in bytes
| `2097152` (2 MB) |
+| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables
without bucket keys: `sticky` or `round_robin` | `sticky` |
+| `scanner.remote-log.prefetch-num` | Number of remote log segments to
prefetch | `4` |
+| `remote-file.download-thread-num` | Number of threads for remote log
downloads | `3` |
+| `scanner.log.max-poll-records` | Max records returned in a single poll()
| `500` |
+| `writer.batch-timeout-ms` | The maximum time to wait for a writer
batch to fill up before sending. | `100` |
Remember to close the connection when done:
diff --git a/website/docs/user-guide/rust/example/configuration.md
b/website/docs/user-guide/rust/example/configuration.md
index 92b9bf2..7d7cc93 100644
--- a/website/docs/user-guide/rust/example/configuration.md
+++ b/website/docs/user-guide/rust/example/configuration.md
@@ -25,3 +25,4 @@ let conn = FlussConnection::new(config).await?;
| `writer_retries` | Number of retries on failure
| `i32::MAX` |
| `writer_batch_size` | Batch size for writes
| 2 MB |
| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables
without bucket keys: `sticky` or `round_robin` | `sticky` |
+| `writer_batch_timeout_ms` | The maximum time to wait for a writer
batch to fill up before sending. | `100` |