This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 847f5fe chore: introduce max poll records config option (#355)
847f5fe is described below
commit 847f5fe90857fa5744a29ffa07450d0b35a8586f
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Feb 22 02:59:32 2026 +0000
chore: introduce max poll records config option (#355)
---
bindings/cpp/include/fluss.hpp | 2 +
bindings/cpp/src/ffi_converter.hpp | 1 +
bindings/cpp/src/lib.rs | 2 +
bindings/python/fluss/__init__.pyi | 4 ++
bindings/python/src/config.rs | 56 ++++++++++++++++------
crates/fluss/src/client/table/scanner.rs | 5 +-
crates/fluss/src/config.rs | 7 +++
website/docs/user-guide/cpp/api-reference.md | 1 +
website/docs/user-guide/python/api-reference.md | 15 +++---
.../user-guide/python/example/configuration.md | 3 ++
website/docs/user-guide/rust/api-reference.md | 1 +
11 files changed, 73 insertions(+), 24 deletions(-)
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 9ea7e41..9a62828 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -983,6 +983,8 @@ struct Configuration {
size_t scanner_remote_log_prefetch_num{4};
// Number of threads for downloading remote log data
size_t remote_file_download_thread_num{3};
+ // Maximum number of records returned in a single call to Poll() for
LogScanner
+ size_t scanner_log_max_poll_records{500};
};
class Connection {
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index cff1a84..370429b 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -53,6 +53,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration&
config) {
ffi_config.writer_batch_size = config.writer_batch_size;
ffi_config.scanner_remote_log_prefetch_num =
config.scanner_remote_log_prefetch_num;
ffi_config.remote_file_download_thread_num =
config.remote_file_download_thread_num;
+ ffi_config.scanner_log_max_poll_records =
config.scanner_log_max_poll_records;
return ffi_config;
}
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 9f987b9..fad98cf 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -45,6 +45,7 @@ mod ffi {
writer_batch_size: i32,
scanner_remote_log_prefetch_num: usize,
remote_file_download_thread_num: usize,
+ scanner_log_max_poll_records: usize,
}
struct FfiResult {
@@ -614,6 +615,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut
Connection, String> {
writer_batch_size: config.writer_batch_size,
scanner_remote_log_prefetch_num:
config.scanner_remote_log_prefetch_num,
remote_file_download_thread_num:
config.remote_file_download_thread_num,
+ scanner_log_max_poll_records: config.scanner_log_max_poll_records,
};
let conn = RUNTIME.block_on(async {
fcore::client::FlussConnection::new(config).await });
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index 4b7fa4e..6f9ae0b 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -161,6 +161,10 @@ class Config:
def remote_file_download_thread_num(self) -> int: ...
@remote_file_download_thread_num.setter
def remote_file_download_thread_num(self, num: int) -> None: ...
+ @property
+ def scanner_log_max_poll_records(self) -> int: ...
+ @scanner_log_max_poll_records.setter
+ def scanner_log_max_poll_records(self, num: int) -> None: ...
class FlussConnection:
@staticmethod
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 237ab6f..fdf90b7 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -43,32 +43,46 @@ impl Config {
config.bootstrap_servers = value;
}
"writer.request-max-size" => {
- if let Ok(size) = value.parse::<i32>() {
- config.writer_request_max_size = size;
- }
+ config.writer_request_max_size =
value.parse::<i32>().map_err(|e| {
+ FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
+ })?;
}
"writer.acks" => {
config.writer_acks = value;
}
"writer.retries" => {
- if let Ok(retries) = value.parse::<i32>() {
- config.writer_retries = retries;
- }
+ config.writer_retries =
value.parse::<i32>().map_err(|e| {
+ FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
+ })?;
}
"writer.batch-size" => {
- if let Ok(size) = value.parse::<i32>() {
- config.writer_batch_size = size;
- }
+ config.writer_batch_size =
value.parse::<i32>().map_err(|e| {
+ FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
+ })?;
}
"scanner.remote-log.prefetch-num" => {
- if let Ok(num) = value.parse::<usize>() {
- config.scanner_remote_log_prefetch_num = num;
- }
+ config.scanner_remote_log_prefetch_num =
+ value.parse::<usize>().map_err(|e| {
+ FlussError::new_err(format!(
+ "Invalid value '{value}' for '{key}': {e}"
+ ))
+ })?;
}
"remote-file.download-thread-num" => {
- if let Ok(num) = value.parse::<usize>() {
- config.remote_file_download_thread_num = num;
- }
+ config.remote_file_download_thread_num =
+ value.parse::<usize>().map_err(|e| {
+ FlussError::new_err(format!(
+ "Invalid value '{value}' for '{key}': {e}"
+ ))
+ })?;
+ }
+ "scanner.log.max-poll-records" => {
+ config.scanner_log_max_poll_records =
+ value.parse::<usize>().map_err(|e| {
+ FlussError::new_err(format!(
+ "Invalid value '{value}' for '{key}': {e}"
+ ))
+ })?;
}
_ => {
return Err(FlussError::new_err(format!("Unknown
property: {key}")));
@@ -163,6 +177,18 @@ impl Config {
fn set_remote_file_download_thread_num(&mut self, num: usize) {
self.inner.remote_file_download_thread_num = num;
}
+
+ /// Get the scanner log max poll records
+ #[getter]
+ fn scanner_log_max_poll_records(&self) -> usize {
+ self.inner.scanner_log_max_poll_records
+ }
+
+ /// Set the scanner log max poll records
+ #[setter]
+ fn set_scanner_log_max_poll_records(&mut self, num: usize) {
+ self.inner.scanner_log_max_poll_records = num;
+ }
}
impl Config {
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 0900267..4b6f809 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -636,6 +636,7 @@ struct LogFetcher {
security_token_manager: Arc<SecurityTokenManager>,
log_fetch_buffer: Arc<LogFetchBuffer>,
nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
+ max_poll_records: usize,
}
struct FetchResponseContext {
@@ -694,6 +695,7 @@ impl LogFetcher {
security_token_manager,
log_fetch_buffer,
nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
+ max_poll_records: config.scanner_log_max_poll_records,
})
}
@@ -1092,9 +1094,8 @@ impl LogFetcher {
/// Collect completed fetches from buffer
/// Reference: LogFetchCollector.collectFetch in Java
fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>>
{
- const MAX_POLL_RECORDS: usize = 500; // Default max poll records
let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
- let mut records_remaining = MAX_POLL_RECORDS;
+ let mut records_remaining = self.max_poll_records;
let collect_result: Result<()> = {
while records_remaining > 0 {
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 92f0b0d..ecf7e12 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -24,6 +24,7 @@ const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024;
const DEFAULT_RETRIES: i32 = i32::MAX;
const DEFAULT_PREFETCH_NUM: usize = 4;
const DEFAULT_DOWNLOAD_THREADS: usize = 3;
+const DEFAULT_MAX_POLL_RECORDS: usize = 500;
const DEFAULT_ACKS: &str = "all";
@@ -54,6 +55,11 @@ pub struct Config {
/// Default: 3 (matching Java REMOTE_FILE_DOWNLOAD_THREAD_NUM)
#[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)]
pub remote_file_download_thread_num: usize,
+
+ /// Maximum number of records returned in a single call to poll() for
LogScanner.
+ /// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS)
+ #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
+ pub scanner_log_max_poll_records: usize,
}
impl Default for Config {
@@ -66,6 +72,7 @@ impl Default for Config {
writer_batch_size: DEFAULT_WRITER_BATCH_SIZE,
scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
+ scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
}
}
}
diff --git a/website/docs/user-guide/cpp/api-reference.md
b/website/docs/user-guide/cpp/api-reference.md
index 433c5da..c18778b 100644
--- a/website/docs/user-guide/cpp/api-reference.md
+++ b/website/docs/user-guide/cpp/api-reference.md
@@ -24,6 +24,7 @@ Complete API reference for the Fluss C++ client.
| `writer_batch_size` | `int32_t` | `2097152` (2 MB) |
Batch size for writes in bytes |
| `scanner_remote_log_prefetch_num` | `size_t` | `4` |
Number of remote log segments to prefetch |
| `remote_file_download_thread_num` | `size_t` | `3` |
Number of threads for remote log downloads |
+| `scanner_log_max_poll_records` | `size_t` | `500` |
Maximum number of records returned in a single Poll() |
## `Connection`
diff --git a/website/docs/user-guide/python/api-reference.md
b/website/docs/user-guide/python/api-reference.md
index 27a57dc..fa62fd9 100644
--- a/website/docs/user-guide/python/api-reference.md
+++ b/website/docs/user-guide/python/api-reference.md
@@ -10,13 +10,14 @@ Complete API reference for the Fluss Python client.
| Method / Property | Description
|
|------------------------------------|-----------------------------------------------------------|
| `Config(properties: dict = None)` | Create config from a dict of key-value
pairs |
-| `.bootstrap_servers` | Get/set coordinator server address
|
-| `.writer_request_max_size` | Get/set max request size in bytes
|
-| `.writer_acks` | Get/set acknowledgment setting (`"all"`
for all replicas) |
-| `.writer_retries` | Get/set number of retries on failure
|
-| `.writer_batch_size` | Get/set write batch size in bytes
|
-| `.scanner_remote_log_prefetch_num` | Get/set number of remote log segments
to prefetch |
-| `.remote_file_download_thread_num` | Get/set number of threads for remote
log downloads |
+| `bootstrap_servers` | Get/set coordinator server address
|
+| `writer_request_max_size` | Get/set max request size in bytes
|
+| `writer_acks` | Get/set acknowledgment setting (`"all"`
for all replicas) |
+| `writer_retries` | Get/set number of retries on failure
|
+| `writer_batch_size` | Get/set write batch size in bytes
|
+| `scanner_remote_log_prefetch_num` | Get/set number of remote log segments
to prefetch |
+| `remote_file_download_thread_num` | Get/set number of threads for remote
log downloads |
+| `scanner_log_max_poll_records` | Get/set max number of records returned
in a single poll() |
## `FlussConnection`
diff --git a/website/docs/user-guide/python/example/configuration.md
b/website/docs/user-guide/python/example/configuration.md
index 9686fc6..f2828c4 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -28,6 +28,9 @@ with await fluss.FlussConnection.create(config) as conn:
| `writer.acks` | Acknowledgment setting (`all` waits for all replicas)
| `all` |
| `writer.retries` | Number of retries on failure
| `2147483647` |
| `writer.batch-size` | Batch size for writes in bytes
| `2097152` (2 MB) |
+| `scanner.remote-log.prefetch-num` | Number of remote log segments to
prefetch | `4` |
+| `remote-file.download-thread-num` | Number of threads for remote log
downloads | `3` |
+| `scanner.log.max-poll-records` | Max records returned in a single poll()
| `500` |
Remember to close the connection when done:
diff --git a/website/docs/user-guide/rust/api-reference.md
b/website/docs/user-guide/rust/api-reference.md
index 0134fbc..a38cd7d 100644
--- a/website/docs/user-guide/rust/api-reference.md
+++ b/website/docs/user-guide/rust/api-reference.md
@@ -16,6 +16,7 @@ Complete API reference for the Fluss Rust client.
| `writer_batch_size` | `i32` | `2097152` (2 MB) | Batch
size for writes in bytes |
| `scanner_remote_log_prefetch_num` | `usize` | `4` | Number
of remote log segments to prefetch |
| `remote_file_download_thread_num` | `usize` | `3` | Number
of threads for remote log downloads |
+| `scanner_log_max_poll_records` | `usize` | `500` | Maximum
number of records returned in a single poll() |
## `FlussConnection`