This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 847f5fe  chore: introduce max poll records config option (#355)
847f5fe is described below

commit 847f5fe90857fa5744a29ffa07450d0b35a8586f
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Feb 22 02:59:32 2026 +0000

    chore: introduce max poll records config option (#355)
---
 bindings/cpp/include/fluss.hpp                     |  2 +
 bindings/cpp/src/ffi_converter.hpp                 |  1 +
 bindings/cpp/src/lib.rs                            |  2 +
 bindings/python/fluss/__init__.pyi                 |  4 ++
 bindings/python/src/config.rs                      | 56 ++++++++++++++++------
 crates/fluss/src/client/table/scanner.rs           |  5 +-
 crates/fluss/src/config.rs                         |  7 +++
 website/docs/user-guide/cpp/api-reference.md       |  1 +
 website/docs/user-guide/python/api-reference.md    | 15 +++---
 .../user-guide/python/example/configuration.md     |  3 ++
 website/docs/user-guide/rust/api-reference.md      |  1 +
 11 files changed, 73 insertions(+), 24 deletions(-)

diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 9ea7e41..9a62828 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -983,6 +983,8 @@ struct Configuration {
     size_t scanner_remote_log_prefetch_num{4};
     // Number of threads for downloading remote log data
     size_t remote_file_download_thread_num{3};
+    // Maximum number of records returned in a single call to Poll() for 
LogScanner
+    size_t scanner_log_max_poll_records{500};
 };
 
 class Connection {
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index cff1a84..370429b 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -53,6 +53,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& 
config) {
     ffi_config.writer_batch_size = config.writer_batch_size;
     ffi_config.scanner_remote_log_prefetch_num = 
config.scanner_remote_log_prefetch_num;
     ffi_config.remote_file_download_thread_num = 
config.remote_file_download_thread_num;
+    ffi_config.scanner_log_max_poll_records = 
config.scanner_log_max_poll_records;
     return ffi_config;
 }
 
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index 9f987b9..fad98cf 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -45,6 +45,7 @@ mod ffi {
         writer_batch_size: i32,
         scanner_remote_log_prefetch_num: usize,
         remote_file_download_thread_num: usize,
+        scanner_log_max_poll_records: usize,
     }
 
     struct FfiResult {
@@ -614,6 +615,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut 
Connection, String> {
         writer_batch_size: config.writer_batch_size,
         scanner_remote_log_prefetch_num: 
config.scanner_remote_log_prefetch_num,
         remote_file_download_thread_num: 
config.remote_file_download_thread_num,
+        scanner_log_max_poll_records: config.scanner_log_max_poll_records,
     };
 
     let conn = RUNTIME.block_on(async { 
fcore::client::FlussConnection::new(config).await });
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index 4b7fa4e..6f9ae0b 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -161,6 +161,10 @@ class Config:
     def remote_file_download_thread_num(self) -> int: ...
     @remote_file_download_thread_num.setter
     def remote_file_download_thread_num(self, num: int) -> None: ...
+    @property
+    def scanner_log_max_poll_records(self) -> int: ...
+    @scanner_log_max_poll_records.setter
+    def scanner_log_max_poll_records(self, num: int) -> None: ...
 
 class FlussConnection:
     @staticmethod
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 237ab6f..fdf90b7 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -43,32 +43,46 @@ impl Config {
                         config.bootstrap_servers = value;
                     }
                     "writer.request-max-size" => {
-                        if let Ok(size) = value.parse::<i32>() {
-                            config.writer_request_max_size = size;
-                        }
+                        config.writer_request_max_size = 
value.parse::<i32>().map_err(|e| {
+                            FlussError::new_err(format!("Invalid value 
'{value}' for '{key}': {e}"))
+                        })?;
                     }
                     "writer.acks" => {
                         config.writer_acks = value;
                     }
                     "writer.retries" => {
-                        if let Ok(retries) = value.parse::<i32>() {
-                            config.writer_retries = retries;
-                        }
+                        config.writer_retries = 
value.parse::<i32>().map_err(|e| {
+                            FlussError::new_err(format!("Invalid value 
'{value}' for '{key}': {e}"))
+                        })?;
                     }
                     "writer.batch-size" => {
-                        if let Ok(size) = value.parse::<i32>() {
-                            config.writer_batch_size = size;
-                        }
+                        config.writer_batch_size = 
value.parse::<i32>().map_err(|e| {
+                            FlussError::new_err(format!("Invalid value 
'{value}' for '{key}': {e}"))
+                        })?;
                     }
                     "scanner.remote-log.prefetch-num" => {
-                        if let Ok(num) = value.parse::<usize>() {
-                            config.scanner_remote_log_prefetch_num = num;
-                        }
+                        config.scanner_remote_log_prefetch_num =
+                            value.parse::<usize>().map_err(|e| {
+                                FlussError::new_err(format!(
+                                    "Invalid value '{value}' for '{key}': {e}"
+                                ))
+                            })?;
                     }
                     "remote-file.download-thread-num" => {
-                        if let Ok(num) = value.parse::<usize>() {
-                            config.remote_file_download_thread_num = num;
-                        }
+                        config.remote_file_download_thread_num =
+                            value.parse::<usize>().map_err(|e| {
+                                FlussError::new_err(format!(
+                                    "Invalid value '{value}' for '{key}': {e}"
+                                ))
+                            })?;
+                    }
+                    "scanner.log.max-poll-records" => {
+                        config.scanner_log_max_poll_records =
+                            value.parse::<usize>().map_err(|e| {
+                                FlussError::new_err(format!(
+                                    "Invalid value '{value}' for '{key}': {e}"
+                                ))
+                            })?;
                     }
                     _ => {
                         return Err(FlussError::new_err(format!("Unknown 
property: {key}")));
@@ -163,6 +177,18 @@ impl Config {
     fn set_remote_file_download_thread_num(&mut self, num: usize) {
         self.inner.remote_file_download_thread_num = num;
     }
+
+    /// Get the scanner log max poll records
+    #[getter]
+    fn scanner_log_max_poll_records(&self) -> usize {
+        self.inner.scanner_log_max_poll_records
+    }
+
+    /// Set the scanner log max poll records
+    #[setter]
+    fn set_scanner_log_max_poll_records(&mut self, num: usize) {
+        self.inner.scanner_log_max_poll_records = num;
+    }
 }
 
 impl Config {
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 0900267..4b6f809 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -636,6 +636,7 @@ struct LogFetcher {
     security_token_manager: Arc<SecurityTokenManager>,
     log_fetch_buffer: Arc<LogFetchBuffer>,
     nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
+    max_poll_records: usize,
 }
 
 struct FetchResponseContext {
@@ -694,6 +695,7 @@ impl LogFetcher {
             security_token_manager,
             log_fetch_buffer,
             nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
+            max_poll_records: config.scanner_log_max_poll_records,
         })
     }
 
@@ -1092,9 +1094,8 @@ impl LogFetcher {
     /// Collect completed fetches from buffer
     /// Reference: LogFetchCollector.collectFetch in Java
     fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> 
{
-        const MAX_POLL_RECORDS: usize = 500; // Default max poll records
         let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
-        let mut records_remaining = MAX_POLL_RECORDS;
+        let mut records_remaining = self.max_poll_records;
 
         let collect_result: Result<()> = {
             while records_remaining > 0 {
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 92f0b0d..ecf7e12 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -24,6 +24,7 @@ const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024;
 const DEFAULT_RETRIES: i32 = i32::MAX;
 const DEFAULT_PREFETCH_NUM: usize = 4;
 const DEFAULT_DOWNLOAD_THREADS: usize = 3;
+const DEFAULT_MAX_POLL_RECORDS: usize = 500;
 
 const DEFAULT_ACKS: &str = "all";
 
@@ -54,6 +55,11 @@ pub struct Config {
     /// Default: 3 (matching Java REMOTE_FILE_DOWNLOAD_THREAD_NUM)
     #[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)]
     pub remote_file_download_thread_num: usize,
+
+    /// Maximum number of records returned in a single call to poll() for 
LogScanner.
+    /// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS)
+    #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
+    pub scanner_log_max_poll_records: usize,
 }
 
 impl Default for Config {
@@ -66,6 +72,7 @@ impl Default for Config {
             writer_batch_size: DEFAULT_WRITER_BATCH_SIZE,
             scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
             remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
+            scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
         }
     }
 }
diff --git a/website/docs/user-guide/cpp/api-reference.md 
b/website/docs/user-guide/cpp/api-reference.md
index 433c5da..c18778b 100644
--- a/website/docs/user-guide/cpp/api-reference.md
+++ b/website/docs/user-guide/cpp/api-reference.md
@@ -24,6 +24,7 @@ Complete API reference for the Fluss C++ client.
 | `writer_batch_size`               | `int32_t`     | `2097152` (2 MB)     | 
Batch size for writes in bytes                                  |
 | `scanner_remote_log_prefetch_num` | `size_t`      | `4`                  | 
Number of remote log segments to prefetch                       |
 | `remote_file_download_thread_num` | `size_t`      | `3`                  | 
Number of threads for remote log downloads                      |
+| `scanner_log_max_poll_records`    | `size_t`      | `500`                | 
Maximum number of records returned in a single Poll()           |
 
 ## `Connection`
 
diff --git a/website/docs/user-guide/python/api-reference.md 
b/website/docs/user-guide/python/api-reference.md
index 27a57dc..fa62fd9 100644
--- a/website/docs/user-guide/python/api-reference.md
+++ b/website/docs/user-guide/python/api-reference.md
@@ -10,13 +10,14 @@ Complete API reference for the Fluss Python client.
 | Method / Property                  | Description                             
                  |
 
|------------------------------------|-----------------------------------------------------------|
 | `Config(properties: dict = None)`  | Create config from a dict of key-value 
pairs              |
-| `.bootstrap_servers`               | Get/set coordinator server address      
                  |
-| `.writer_request_max_size`         | Get/set max request size in bytes       
                  |
-| `.writer_acks`                     | Get/set acknowledgment setting (`"all"` 
for all replicas) |
-| `.writer_retries`                  | Get/set number of retries on failure    
                  |
-| `.writer_batch_size`               | Get/set write batch size in bytes       
                  |
-| `.scanner_remote_log_prefetch_num` | Get/set number of remote log segments 
to prefetch         |
-| `.remote_file_download_thread_num` | Get/set number of threads for remote 
log downloads        |
+| `bootstrap_servers`                | Get/set coordinator server address      
                  |
+| `writer_request_max_size`          | Get/set max request size in bytes       
                  |
+| `writer_acks`                      | Get/set acknowledgment setting (`"all"` 
for all replicas) |
+| `writer_retries`                   | Get/set number of retries on failure    
                  |
+| `writer_batch_size`                | Get/set write batch size in bytes       
                  |
+| `scanner_remote_log_prefetch_num`  | Get/set number of remote log segments 
to prefetch         |
+| `remote_file_download_thread_num`  | Get/set number of threads for remote 
log downloads        |
+| `scanner_log_max_poll_records`     | Get/set max number of records returned 
in a single poll() |
 
 ## `FlussConnection`
 
diff --git a/website/docs/user-guide/python/example/configuration.md 
b/website/docs/user-guide/python/example/configuration.md
index 9686fc6..f2828c4 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -28,6 +28,9 @@ with await fluss.FlussConnection.create(config) as conn:
 | `writer.acks`       | Acknowledgment setting (`all` waits for all replicas) 
| `all`              |
 | `writer.retries`    | Number of retries on failure                          
| `2147483647`       |
 | `writer.batch-size` | Batch size for writes in bytes                        
| `2097152` (2 MB)   |
+| `scanner.remote-log.prefetch-num` | Number of remote log segments to 
prefetch | `4`                |
+| `remote-file.download-thread-num` | Number of threads for remote log 
downloads | `3`               |
+| `scanner.log.max-poll-records` | Max records returned in a single poll()     
  | `500`              |
 
 Remember to close the connection when done:
 
diff --git a/website/docs/user-guide/rust/api-reference.md 
b/website/docs/user-guide/rust/api-reference.md
index 0134fbc..a38cd7d 100644
--- a/website/docs/user-guide/rust/api-reference.md
+++ b/website/docs/user-guide/rust/api-reference.md
@@ -16,6 +16,7 @@ Complete API reference for the Fluss Rust client.
 | `writer_batch_size`               | `i32`    | `2097152` (2 MB)   | Batch 
size for writes in bytes                          |
 | `scanner_remote_log_prefetch_num` | `usize`  | `4`                | Number 
of remote log segments to prefetch               |
 | `remote_file_download_thread_num` | `usize`  | `3`                | Number 
of threads for remote log downloads              |
+| `scanner_log_max_poll_records`    | `usize`  | `500`              | Maximum 
number of records returned in a single poll()   |
 
 ## `FlussConnection`
 

Reply via email to