This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f549ef  perf: introduce streaming download for file download (#381)
0f549ef is described below

commit 0f549ef1526dfdafa410526a77d42347dadcfb66
Author: AlexZhao <[email protected]>
AuthorDate: Fri Feb 27 22:16:55 2026 +0800

    perf: introduce streaming download for file download (#381)
---
 bindings/cpp/include/fluss.hpp                     |   2 +
 bindings/cpp/src/ffi_converter.hpp                 |   1 +
 bindings/cpp/src/lib.rs                            |   6 +-
 bindings/python/fluss/__init__.pyi                 |   4 +
 bindings/python/src/config.rs                      |  20 ++++
 crates/fluss/src/client/table/remote_log.rs        | 103 +++++++++++++--------
 crates/fluss/src/client/table/scanner.rs           |   1 +
 crates/fluss/src/config.rs                         |   7 ++
 .../docs/user-guide/cpp/example/configuration.md   |   2 +
 .../user-guide/python/example/configuration.md     |  25 ++---
 .../docs/user-guide/rust/example/configuration.md  |   6 +-
 11 files changed, 121 insertions(+), 56 deletions(-)

diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 0f980b7..0a62af9 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -985,6 +985,8 @@ struct Configuration {
     size_t scanner_remote_log_prefetch_num{4};
     // Number of threads for downloading remote log data
     size_t remote_file_download_thread_num{3};
+    // Remote log read concurrency within one file (streaming read path)
+    size_t scanner_remote_log_read_concurrency{4};
     // Maximum number of records returned in a single call to Poll() for 
LogScanner
     size_t scanner_log_max_poll_records{500};
     int64_t writer_batch_timeout_ms{100};
diff --git a/bindings/cpp/src/ffi_converter.hpp 
b/bindings/cpp/src/ffi_converter.hpp
index e376343..9020027 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -54,6 +54,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& 
config) {
     ffi_config.writer_bucket_no_key_assigner = 
rust::String(config.writer_bucket_no_key_assigner);
     ffi_config.scanner_remote_log_prefetch_num = 
config.scanner_remote_log_prefetch_num;
     ffi_config.remote_file_download_thread_num = 
config.remote_file_download_thread_num;
+    ffi_config.scanner_remote_log_read_concurrency = 
config.scanner_remote_log_read_concurrency;
     ffi_config.scanner_log_max_poll_records = 
config.scanner_log_max_poll_records;
     ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
     return ffi_config;
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index ea1307e..9b01d32 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -46,6 +46,7 @@ mod ffi {
         writer_bucket_no_key_assigner: String,
         scanner_remote_log_prefetch_num: usize,
         remote_file_download_thread_num: usize,
+        scanner_remote_log_read_concurrency: usize,
         scanner_log_max_poll_records: usize,
         writer_batch_timeout_ms: i64,
     }
@@ -618,7 +619,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut 
Connection, String> {
             ));
         }
     };
-    let config = fluss::config::Config {
+    let config_core = fluss::config::Config {
         bootstrap_servers: config.bootstrap_servers.to_string(),
         writer_request_max_size: config.writer_request_max_size,
         writer_acks: config.writer_acks.to_string(),
@@ -628,10 +629,11 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut 
Connection, String> {
         writer_bucket_no_key_assigner: assigner_type,
         scanner_remote_log_prefetch_num: 
config.scanner_remote_log_prefetch_num,
         remote_file_download_thread_num: 
config.remote_file_download_thread_num,
+        scanner_remote_log_read_concurrency: 
config.scanner_remote_log_read_concurrency,
         scanner_log_max_poll_records: config.scanner_log_max_poll_records,
     };
 
-    let conn = RUNTIME.block_on(async { 
fcore::client::FlussConnection::new(config).await });
+    let conn = RUNTIME.block_on(async { 
fcore::client::FlussConnection::new(config_core).await });
 
     match conn {
         Ok(c) => {
diff --git a/bindings/python/fluss/__init__.pyi 
b/bindings/python/fluss/__init__.pyi
index 6f9ae0b..514d011 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -162,6 +162,10 @@ class Config:
     @remote_file_download_thread_num.setter
     def remote_file_download_thread_num(self, num: int) -> None: ...
     @property
+    def scanner_remote_log_read_concurrency(self) -> int: ...
+    @scanner_remote_log_read_concurrency.setter
+    def scanner_remote_log_read_concurrency(self, num: int) -> None: ...
+    @property
     def scanner_log_max_poll_records(self) -> int: ...
     @scanner_log_max_poll_records.setter
     def scanner_log_max_poll_records(self, num: int) -> None: ...
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 5b7f2d3..9c0059e 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -81,6 +81,14 @@ impl Config {
                                 ))
                             })?;
                     }
+                    "scanner.remote-log.read-concurrency" => {
+                        config.scanner_remote_log_read_concurrency =
+                            value.parse::<usize>().map_err(|e| {
+                                FlussError::new_err(format!(
+                                    "Invalid value '{value}' for '{key}': {e}"
+                                ))
+                            })?;
+                    }
                     "scanner.log.max-poll-records" => {
                         config.scanner_log_max_poll_records =
                             value.parse::<usize>().map_err(|e| {
@@ -194,6 +202,18 @@ impl Config {
         self.inner.remote_file_download_thread_num = num;
     }
 
+    /// Get the scanner remote log read concurrency
+    #[getter]
+    fn scanner_remote_log_read_concurrency(&self) -> usize {
+        self.inner.scanner_remote_log_read_concurrency
+    }
+
+    /// Set the scanner remote log read concurrency
+    #[setter]
+    fn set_scanner_remote_log_read_concurrency(&mut self, num: usize) {
+        self.inner.scanner_remote_log_read_concurrency = num;
+    }
+
     /// Get the scanner log max poll records
     #[getter]
     fn scanner_log_max_poll_records(&self) -> usize {
diff --git a/crates/fluss/src/client/table/remote_log.rs 
b/crates/fluss/src/client/table/remote_log.rs
index 02820d9..6bc9551 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -19,9 +19,10 @@ use crate::error::{Error, Result};
 use crate::io::{FileIO, Storage};
 use crate::metadata::TableBucket;
 use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
+use futures::TryStreamExt;
 use parking_lot::Mutex;
 use std::{
-    cmp::{Ordering, Reverse, min},
+    cmp::{Ordering, Reverse},
     collections::{BinaryHeap, HashMap},
     future::Future,
     io, mem,
@@ -293,6 +294,7 @@ enum DownloadResult {
 struct ProductionFetcher {
     credentials_rx: CredentialsReceiver,
     local_log_dir: Arc<TempDir>,
+    remote_log_read_concurrency: usize,
 }
 
 impl RemoteLogFetcher for ProductionFetcher {
@@ -302,6 +304,7 @@ impl RemoteLogFetcher for ProductionFetcher {
     ) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
         let mut credentials_rx = self.credentials_rx.clone();
         let local_log_dir = self.local_log_dir.clone();
+        let remote_log_read_concurrency = self.remote_log_read_concurrency;
 
         // Clone data needed for async operation to avoid lifetime issues
         let segment = request.segment.clone();
@@ -361,6 +364,7 @@ impl RemoteLogFetcher for ProductionFetcher {
                 &remote_path,
                 &local_file_path,
                 &remote_fs_props,
+                remote_log_read_concurrency,
             )
             .await?;
 
@@ -768,11 +772,13 @@ impl RemoteLogDownloader {
         local_log_dir: TempDir,
         max_prefetch_segments: usize,
         max_concurrent_downloads: usize,
+        remote_log_read_concurrency: usize,
         credentials_rx: CredentialsReceiver,
     ) -> Result<Self> {
         let fetcher = Arc::new(ProductionFetcher {
             credentials_rx,
             local_log_dir: Arc::new(local_log_dir),
+            remote_log_read_concurrency: remote_log_read_concurrency.max(1),
         });
 
         Self::new_with_fetcher(fetcher, max_prefetch_segments, 
max_concurrent_downloads)
@@ -848,12 +854,13 @@ impl Drop for RemoteLogDownloader {
 }
 
 impl RemoteLogDownloader {
-    /// Download a file from remote storage to local using streaming read/write
+    /// Download a file from remote storage to local using streaming 
read/write.
     async fn download_file(
         remote_log_tablet_dir: &str,
         remote_path: &str,
         local_path: &Path,
         remote_fs_props: &HashMap<String, String>,
+        remote_log_read_concurrency: usize,
     ) -> Result<PathBuf> {
         // Handle both URL (e.g., "s3://bucket/path") and local file paths
         // If the path doesn't contain "://", treat it as a local file path
@@ -886,56 +893,70 @@ impl RemoteLogDownloader {
 
         // Timeout for remote storage operations (30 seconds)
         const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30);
+        const CHUNK_SIZE: usize = 8 * 1024 * 1024; // 8MiB
+
+        Self::download_file_streaming(
+            &op,
+            relative_path,
+            remote_path,
+            local_path,
+            CHUNK_SIZE,
+            remote_log_read_concurrency,
+            REMOTE_OP_TIMEOUT,
+        )
+        .await?;
 
-        // Get file metadata to know the size with timeout
-        let meta = op.stat(relative_path).await?;
-        let file_size = meta.content_length();
+        Ok(local_path.to_path_buf())
+    }
 
-        // Create local file for writing
+    async fn download_file_streaming(
+        op: &opendal::Operator,
+        relative_path: &str,
+        remote_path: &str,
+        local_path: &Path,
+        chunk_size: usize,
+        streaming_read_concurrency: usize,
+        remote_op_timeout: Duration,
+    ) -> Result<()> {
         let mut local_file = tokio::fs::File::create(local_path).await?;
 
-        // Stream data from remote to local file in chunks
-        // opendal::Reader::read accepts a range, so we read in chunks
-        const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient 
reading
-        let mut offset = 0u64;
-        let mut chunk_count = 0u64;
-        let total_chunks = file_size.div_ceil(CHUNK_SIZE);
+        let reader_future = op
+            .reader_with(relative_path)
+            .chunk(chunk_size)
+            .concurrent(streaming_read_concurrency);
+        let reader = tokio::time::timeout(remote_op_timeout, reader_future)
+            .await
+            .map_err(|e| Error::IoUnexpectedError {
+                message: format!("Timeout creating streaming reader for 
{remote_path}: {e}."),
+                source: io::ErrorKind::TimedOut.into(),
+            })??;
+
+        let mut stream = tokio::time::timeout(remote_op_timeout, 
reader.into_bytes_stream(..))
+            .await
+            .map_err(|e| Error::IoUnexpectedError {
+                message: format!("Timeout creating streaming bytes stream for 
{remote_path}: {e}."),
+                source: io::ErrorKind::TimedOut.into(),
+            })??;
 
-        while offset < file_size {
-            let end = min(offset + CHUNK_SIZE, file_size);
-            let range = offset..end;
+        let mut chunk_count = 0u64;
+        while let Some(chunk) = tokio::time::timeout(remote_op_timeout, 
stream.try_next())
+            .await
+            .map_err(|e| Error::IoUnexpectedError {
+                message: format!(
+                    "Timeout streaming chunk from remote storage: 
{remote_path}, exception: {e}."
+                ),
+                source: io::ErrorKind::TimedOut.into(),
+            })??
+        {
             chunk_count += 1;
-
             if chunk_count <= 3 || chunk_count % 10 == 0 {
-                log::debug!(
-                    "Remote log download: reading chunk 
{chunk_count}/{total_chunks} (offset {offset})"
-                );
+                log::debug!("Remote log streaming download: chunk 
#{chunk_count} ({remote_path})");
             }
-
-            // Read chunk from remote storage with timeout
-            let read_future = op.read_with(relative_path).range(range.clone());
-            let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future)
-                .await
-                .map_err(|e| {
-                    Error::IoUnexpectedError {
-                        message: format!(
-                            "Timeout reading chunk from remote storage: 
{remote_path} at offset {offset}, exception: {e}."
-                        ),
-                        source: io::ErrorKind::TimedOut.into(),
-                    }
-                })??;
-            let bytes = chunk.to_bytes();
-
-            // Write chunk to local file
-            local_file.write_all(&bytes).await?;
-
-            offset = end;
+            local_file.write_all(&chunk).await?;
         }
 
-        // Ensure all data is flushed to disk
         local_file.sync_all().await?;
-
-        Ok(local_path.to_path_buf())
+        Ok(())
     }
 }
 
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index 3ec9106..e837ba7 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -677,6 +677,7 @@ impl LogFetcher {
             tmp_dir,
             config.scanner_remote_log_prefetch_num,
             config.remote_file_download_thread_num,
+            config.scanner_remote_log_read_concurrency,
             credentials_rx,
         )?);
 
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 3026493..a0d7e70 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -25,6 +25,7 @@ const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024;
 const DEFAULT_RETRIES: i32 = i32::MAX;
 const DEFAULT_PREFETCH_NUM: usize = 4;
 const DEFAULT_DOWNLOAD_THREADS: usize = 3;
+const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4;
 const DEFAULT_MAX_POLL_RECORDS: usize = 500;
 const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
 
@@ -81,6 +82,11 @@ pub struct Config {
     #[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)]
     pub remote_file_download_thread_num: usize,
 
+    /// Intra-file remote log read concurrency for each remote segment 
download.
+    /// Download path always uses streaming reader.
+    #[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY)]
+    pub scanner_remote_log_read_concurrency: usize,
+
     /// Maximum number of records returned in a single call to poll() for 
LogScanner.
     /// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS)
     #[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
@@ -103,6 +109,7 @@ impl Default for Config {
             writer_bucket_no_key_assigner: NoKeyAssigner::Sticky,
             scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
             remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
+            scanner_remote_log_read_concurrency: 
DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY,
             scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
             writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
         }
diff --git a/website/docs/user-guide/cpp/example/configuration.md 
b/website/docs/user-guide/cpp/example/configuration.md
index d73661a..2245ee1 100644
--- a/website/docs/user-guide/cpp/example/configuration.md
+++ b/website/docs/user-guide/cpp/example/configuration.md
@@ -34,4 +34,6 @@ config.writer_batch_timeout_ms = 100;           // Max time 
to wait for a batch
 config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
 config.scanner_remote_log_prefetch_num = 4;      // Remote log prefetch count
 config.remote_file_download_thread_num = 3;  // Download threads
+config.scanner_remote_log_read_concurrency = 4;   // In-file remote log read 
concurrency
+config.scanner_log_max_poll_records = 500;        // Max records returned per 
poll()
 ```
diff --git a/website/docs/user-guide/python/example/configuration.md 
b/website/docs/user-guide/python/example/configuration.md
index 71e7199..39c53be 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -21,18 +21,19 @@ with await fluss.FlussConnection.create(config) as conn:
 
 ## Connection Configurations
 
-| Key                                | Description                             
                                             | Default            |
-|------------------------------------|--------------------------------------------------------------------------------------|--------------------|
-| `bootstrap.servers`               | Coordinator server address               
                                            | `127.0.0.1:9123`   |
-| `writer.request-max-size`         | Maximum request size in bytes            
                                            | `10485760` (10 MB) |
-| `writer.acks`                     | Acknowledgment setting (`all` waits for 
all replicas)                                | `all`              |
-| `writer.retries`                  | Number of retries on failure             
                                            | `2147483647`       |
-| `writer.batch-size`               | Batch size for writes in bytes           
                                            | `2097152` (2 MB)   |
-| `writer.bucket.no-key-assigner`   | Bucket assignment strategy for tables 
without bucket keys: `sticky` or `round_robin` | `sticky`           |
-| `scanner.remote-log.prefetch-num` | Number of remote log segments to 
prefetch                                            | `4`                |
-| `remote-file.download-thread-num` | Number of threads for remote log 
downloads                                           | `3`                |
-| `scanner.log.max-poll-records`    | Max records returned in a single poll()  
                                            | `500`              |
-| `writer.batch-timeout-ms`         | The maximum time to wait for a writer 
batch to fill up before sending.               | `100`              |
+| Key                                   | Description                          
                                                 | Default            |
+|---------------------------------------|---------------------------------------------------------------------------------------|--------------------|
+| `bootstrap.servers`                   | Coordinator server address           
                                                 | `127.0.0.1:9123`   |
+| `writer.request-max-size`             | Maximum request size in bytes        
                                                 | `10485760` (10 MB) |
+| `writer.acks`                         | Acknowledgment setting (`all` waits 
for all replicas)                                 | `all`              |
+| `writer.retries`                      | Number of retries on failure         
                                                 | `2147483647`       |
+| `writer.batch-size`                   | Batch size for writes in bytes       
                                                 | `2097152` (2 MB)   |
+| `writer.batch-timeout-ms`             | The maximum time to wait for a 
writer batch to fill up before sending.               | `100`              |
+| `writer.bucket.no-key-assigner`       | Bucket assignment strategy for 
tables without bucket keys: `sticky` or `round_robin` | `sticky`           |
+| `scanner.remote-log.prefetch-num`     | Number of remote log segments to 
prefetch                                             | `4`                |
+| `remote-file.download-thread-num`     | Number of threads for remote log 
downloads                                            | `3`                |
+| `scanner.remote-log.read-concurrency` | Streaming read concurrency within a 
remote log file                                   | `4`                |
+| `scanner.log.max-poll-records`        | Max records returned in a single 
poll()                                               | `500`              |
 
 Remember to close the connection when done:
 
diff --git a/website/docs/user-guide/rust/example/configuration.md 
b/website/docs/user-guide/rust/example/configuration.md
index 7d7cc93..a2f52dc 100644
--- a/website/docs/user-guide/rust/example/configuration.md
+++ b/website/docs/user-guide/rust/example/configuration.md
@@ -24,5 +24,9 @@ let conn = FlussConnection::new(config).await?;
 | `writer_acks`                   | Acknowledgment setting (`all` waits for 
all replicas)                                | `all`            |
 | `writer_retries`                | Number of retries on failure               
                                          | `i32::MAX`       |
 | `writer_batch_size`             | Batch size for writes                      
                                          | 2 MB             |
-| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables 
without bucket keys: `sticky` or `round_robin` | `sticky`         |
 | `writer_batch_timeout_ms`       | The maximum time to wait for a writer 
batch to fill up before sending.               | `100`            |
+| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables 
without bucket keys: `sticky` or `round_robin` | `sticky`         |
+| `scanner_remote_log_prefetch_num` | Number of remote log segments to 
prefetch                                           | `4`              |
+| `remote_file_download_thread_num` | Number of concurrent remote log file 
downloads                                      | `3`              |
+| `scanner_remote_log_read_concurrency` | Streaming read concurrency within a 
remote log file                           | `4`              |
+| `scanner_log_max_poll_records`  | Maximum records returned in a single 
`poll()`                                       | `500`            |

Reply via email to