This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0f549ef perf: introduce streaming download for file download (#381)
0f549ef is described below
commit 0f549ef1526dfdafa410526a77d42347dadcfb66
Author: AlexZhao <[email protected]>
AuthorDate: Fri Feb 27 22:16:55 2026 +0800
perf: introduce streaming download for file download (#381)
---
bindings/cpp/include/fluss.hpp | 2 +
bindings/cpp/src/ffi_converter.hpp | 1 +
bindings/cpp/src/lib.rs | 6 +-
bindings/python/fluss/__init__.pyi | 4 +
bindings/python/src/config.rs | 20 ++++
crates/fluss/src/client/table/remote_log.rs | 103 +++++++++++++--------
crates/fluss/src/client/table/scanner.rs | 1 +
crates/fluss/src/config.rs | 7 ++
.../docs/user-guide/cpp/example/configuration.md | 2 +
.../user-guide/python/example/configuration.md | 25 ++---
.../docs/user-guide/rust/example/configuration.md | 6 +-
11 files changed, 121 insertions(+), 56 deletions(-)
diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp
index 0f980b7..0a62af9 100644
--- a/bindings/cpp/include/fluss.hpp
+++ b/bindings/cpp/include/fluss.hpp
@@ -985,6 +985,8 @@ struct Configuration {
size_t scanner_remote_log_prefetch_num{4};
// Number of threads for downloading remote log data
size_t remote_file_download_thread_num{3};
+ // Remote log read concurrency within one file (streaming read path)
+ size_t scanner_remote_log_read_concurrency{4};
// Maximum number of records returned in a single call to Poll() for
LogScanner
size_t scanner_log_max_poll_records{500};
int64_t writer_batch_timeout_ms{100};
diff --git a/bindings/cpp/src/ffi_converter.hpp
b/bindings/cpp/src/ffi_converter.hpp
index e376343..9020027 100644
--- a/bindings/cpp/src/ffi_converter.hpp
+++ b/bindings/cpp/src/ffi_converter.hpp
@@ -54,6 +54,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration&
config) {
ffi_config.writer_bucket_no_key_assigner =
rust::String(config.writer_bucket_no_key_assigner);
ffi_config.scanner_remote_log_prefetch_num =
config.scanner_remote_log_prefetch_num;
ffi_config.remote_file_download_thread_num =
config.remote_file_download_thread_num;
+ ffi_config.scanner_remote_log_read_concurrency =
config.scanner_remote_log_read_concurrency;
ffi_config.scanner_log_max_poll_records =
config.scanner_log_max_poll_records;
ffi_config.writer_batch_timeout_ms = config.writer_batch_timeout_ms;
return ffi_config;
diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs
index ea1307e..9b01d32 100644
--- a/bindings/cpp/src/lib.rs
+++ b/bindings/cpp/src/lib.rs
@@ -46,6 +46,7 @@ mod ffi {
writer_bucket_no_key_assigner: String,
scanner_remote_log_prefetch_num: usize,
remote_file_download_thread_num: usize,
+ scanner_remote_log_read_concurrency: usize,
scanner_log_max_poll_records: usize,
writer_batch_timeout_ms: i64,
}
@@ -618,7 +619,7 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut
Connection, String> {
));
}
};
- let config = fluss::config::Config {
+ let config_core = fluss::config::Config {
bootstrap_servers: config.bootstrap_servers.to_string(),
writer_request_max_size: config.writer_request_max_size,
writer_acks: config.writer_acks.to_string(),
@@ -628,10 +629,11 @@ fn new_connection(config: &ffi::FfiConfig) -> Result<*mut
Connection, String> {
writer_bucket_no_key_assigner: assigner_type,
scanner_remote_log_prefetch_num:
config.scanner_remote_log_prefetch_num,
remote_file_download_thread_num:
config.remote_file_download_thread_num,
+ scanner_remote_log_read_concurrency:
config.scanner_remote_log_read_concurrency,
scanner_log_max_poll_records: config.scanner_log_max_poll_records,
};
- let conn = RUNTIME.block_on(async {
fcore::client::FlussConnection::new(config).await });
+ let conn = RUNTIME.block_on(async {
fcore::client::FlussConnection::new(config_core).await });
match conn {
Ok(c) => {
diff --git a/bindings/python/fluss/__init__.pyi
b/bindings/python/fluss/__init__.pyi
index 6f9ae0b..514d011 100644
--- a/bindings/python/fluss/__init__.pyi
+++ b/bindings/python/fluss/__init__.pyi
@@ -162,6 +162,10 @@ class Config:
@remote_file_download_thread_num.setter
def remote_file_download_thread_num(self, num: int) -> None: ...
@property
+ def scanner_remote_log_read_concurrency(self) -> int: ...
+ @scanner_remote_log_read_concurrency.setter
+ def scanner_remote_log_read_concurrency(self, num: int) -> None: ...
+ @property
def scanner_log_max_poll_records(self) -> int: ...
@scanner_log_max_poll_records.setter
def scanner_log_max_poll_records(self, num: int) -> None: ...
diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs
index 5b7f2d3..9c0059e 100644
--- a/bindings/python/src/config.rs
+++ b/bindings/python/src/config.rs
@@ -81,6 +81,14 @@ impl Config {
))
})?;
}
+ "scanner.remote-log.read-concurrency" => {
+ config.scanner_remote_log_read_concurrency =
+ value.parse::<usize>().map_err(|e| {
+ FlussError::new_err(format!(
+ "Invalid value '{value}' for '{key}': {e}"
+ ))
+ })?;
+ }
"scanner.log.max-poll-records" => {
config.scanner_log_max_poll_records =
value.parse::<usize>().map_err(|e| {
@@ -194,6 +202,18 @@ impl Config {
self.inner.remote_file_download_thread_num = num;
}
+ /// Get the scanner remote log read concurrency
+ #[getter]
+ fn scanner_remote_log_read_concurrency(&self) -> usize {
+ self.inner.scanner_remote_log_read_concurrency
+ }
+
+ /// Set the scanner remote log read concurrency
+ #[setter]
+ fn set_scanner_remote_log_read_concurrency(&mut self, num: usize) {
+ self.inner.scanner_remote_log_read_concurrency = num;
+ }
+
/// Get the scanner log max poll records
#[getter]
fn scanner_log_max_poll_records(&self) -> usize {
diff --git a/crates/fluss/src/client/table/remote_log.rs
b/crates/fluss/src/client/table/remote_log.rs
index 02820d9..6bc9551 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -19,9 +19,10 @@ use crate::error::{Error, Result};
use crate::io::{FileIO, Storage};
use crate::metadata::TableBucket;
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
+use futures::TryStreamExt;
use parking_lot::Mutex;
use std::{
- cmp::{Ordering, Reverse, min},
+ cmp::{Ordering, Reverse},
collections::{BinaryHeap, HashMap},
future::Future,
io, mem,
@@ -293,6 +294,7 @@ enum DownloadResult {
struct ProductionFetcher {
credentials_rx: CredentialsReceiver,
local_log_dir: Arc<TempDir>,
+ remote_log_read_concurrency: usize,
}
impl RemoteLogFetcher for ProductionFetcher {
@@ -302,6 +304,7 @@ impl RemoteLogFetcher for ProductionFetcher {
) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
let mut credentials_rx = self.credentials_rx.clone();
let local_log_dir = self.local_log_dir.clone();
+ let remote_log_read_concurrency = self.remote_log_read_concurrency;
// Clone data needed for async operation to avoid lifetime issues
let segment = request.segment.clone();
@@ -361,6 +364,7 @@ impl RemoteLogFetcher for ProductionFetcher {
&remote_path,
&local_file_path,
&remote_fs_props,
+ remote_log_read_concurrency,
)
.await?;
@@ -768,11 +772,13 @@ impl RemoteLogDownloader {
local_log_dir: TempDir,
max_prefetch_segments: usize,
max_concurrent_downloads: usize,
+ remote_log_read_concurrency: usize,
credentials_rx: CredentialsReceiver,
) -> Result<Self> {
let fetcher = Arc::new(ProductionFetcher {
credentials_rx,
local_log_dir: Arc::new(local_log_dir),
+ remote_log_read_concurrency: remote_log_read_concurrency.max(1),
});
Self::new_with_fetcher(fetcher, max_prefetch_segments,
max_concurrent_downloads)
@@ -848,12 +854,13 @@ impl Drop for RemoteLogDownloader {
}
impl RemoteLogDownloader {
- /// Download a file from remote storage to local using streaming read/write
+ /// Download a file from remote storage to local using streaming
read/write.
async fn download_file(
remote_log_tablet_dir: &str,
remote_path: &str,
local_path: &Path,
remote_fs_props: &HashMap<String, String>,
+ remote_log_read_concurrency: usize,
) -> Result<PathBuf> {
// Handle both URL (e.g., "s3://bucket/path") and local file paths
// If the path doesn't contain "://", treat it as a local file path
@@ -886,56 +893,70 @@ impl RemoteLogDownloader {
// Timeout for remote storage operations (30 seconds)
const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30);
+ const CHUNK_SIZE: usize = 8 * 1024 * 1024; // 8MiB
+
+ Self::download_file_streaming(
+ &op,
+ relative_path,
+ remote_path,
+ local_path,
+ CHUNK_SIZE,
+ remote_log_read_concurrency,
+ REMOTE_OP_TIMEOUT,
+ )
+ .await?;
- // Get file metadata to know the size with timeout
- let meta = op.stat(relative_path).await?;
- let file_size = meta.content_length();
+ Ok(local_path.to_path_buf())
+ }
- // Create local file for writing
+ async fn download_file_streaming(
+ op: &opendal::Operator,
+ relative_path: &str,
+ remote_path: &str,
+ local_path: &Path,
+ chunk_size: usize,
+ streaming_read_concurrency: usize,
+ remote_op_timeout: Duration,
+ ) -> Result<()> {
let mut local_file = tokio::fs::File::create(local_path).await?;
- // Stream data from remote to local file in chunks
- // opendal::Reader::read accepts a range, so we read in chunks
- const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient
reading
- let mut offset = 0u64;
- let mut chunk_count = 0u64;
- let total_chunks = file_size.div_ceil(CHUNK_SIZE);
+ let reader_future = op
+ .reader_with(relative_path)
+ .chunk(chunk_size)
+ .concurrent(streaming_read_concurrency);
+ let reader = tokio::time::timeout(remote_op_timeout, reader_future)
+ .await
+ .map_err(|e| Error::IoUnexpectedError {
+ message: format!("Timeout creating streaming reader for
{remote_path}: {e}."),
+ source: io::ErrorKind::TimedOut.into(),
+ })??;
+
+ let mut stream = tokio::time::timeout(remote_op_timeout,
reader.into_bytes_stream(..))
+ .await
+ .map_err(|e| Error::IoUnexpectedError {
+ message: format!("Timeout creating streaming bytes stream for
{remote_path}: {e}."),
+ source: io::ErrorKind::TimedOut.into(),
+ })??;
- while offset < file_size {
- let end = min(offset + CHUNK_SIZE, file_size);
- let range = offset..end;
+ let mut chunk_count = 0u64;
+ while let Some(chunk) = tokio::time::timeout(remote_op_timeout,
stream.try_next())
+ .await
+ .map_err(|e| Error::IoUnexpectedError {
+ message: format!(
+ "Timeout streaming chunk from remote storage:
{remote_path}, exception: {e}."
+ ),
+ source: io::ErrorKind::TimedOut.into(),
+ })??
+ {
chunk_count += 1;
-
if chunk_count <= 3 || chunk_count % 10 == 0 {
- log::debug!(
- "Remote log download: reading chunk
{chunk_count}/{total_chunks} (offset {offset})"
- );
+ log::debug!("Remote log streaming download: chunk
#{chunk_count} ({remote_path})");
}
-
- // Read chunk from remote storage with timeout
- let read_future = op.read_with(relative_path).range(range.clone());
- let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future)
- .await
- .map_err(|e| {
- Error::IoUnexpectedError {
- message: format!(
- "Timeout reading chunk from remote storage:
{remote_path} at offset {offset}, exception: {e}."
- ),
- source: io::ErrorKind::TimedOut.into(),
- }
- })??;
- let bytes = chunk.to_bytes();
-
- // Write chunk to local file
- local_file.write_all(&bytes).await?;
-
- offset = end;
+ local_file.write_all(&chunk).await?;
}
- // Ensure all data is flushed to disk
local_file.sync_all().await?;
-
- Ok(local_path.to_path_buf())
+ Ok(())
}
}
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index 3ec9106..e837ba7 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -677,6 +677,7 @@ impl LogFetcher {
tmp_dir,
config.scanner_remote_log_prefetch_num,
config.remote_file_download_thread_num,
+ config.scanner_remote_log_read_concurrency,
credentials_rx,
)?);
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 3026493..a0d7e70 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -25,6 +25,7 @@ const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024;
const DEFAULT_RETRIES: i32 = i32::MAX;
const DEFAULT_PREFETCH_NUM: usize = 4;
const DEFAULT_DOWNLOAD_THREADS: usize = 3;
+const DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY: usize = 4;
const DEFAULT_MAX_POLL_RECORDS: usize = 500;
const DEFAULT_WRITER_BATCH_TIMEOUT_MS: i64 = 100;
@@ -81,6 +82,11 @@ pub struct Config {
#[arg(long, default_value_t = DEFAULT_DOWNLOAD_THREADS)]
pub remote_file_download_thread_num: usize,
+ /// Intra-file remote log read concurrency for each remote segment
download.
+ /// Download path always uses streaming reader.
+ #[arg(long, default_value_t = DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY)]
+ pub scanner_remote_log_read_concurrency: usize,
+
/// Maximum number of records returned in a single call to poll() for
LogScanner.
/// Default: 500 (matching Java CLIENT_SCANNER_LOG_MAX_POLL_RECORDS)
#[arg(long, default_value_t = DEFAULT_MAX_POLL_RECORDS)]
@@ -103,6 +109,7 @@ impl Default for Config {
writer_bucket_no_key_assigner: NoKeyAssigner::Sticky,
scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM,
remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS,
+ scanner_remote_log_read_concurrency:
DEFAULT_SCANNER_REMOTE_LOG_READ_CONCURRENCY,
scanner_log_max_poll_records: DEFAULT_MAX_POLL_RECORDS,
writer_batch_timeout_ms: DEFAULT_WRITER_BATCH_TIMEOUT_MS,
}
diff --git a/website/docs/user-guide/cpp/example/configuration.md
b/website/docs/user-guide/cpp/example/configuration.md
index d73661a..2245ee1 100644
--- a/website/docs/user-guide/cpp/example/configuration.md
+++ b/website/docs/user-guide/cpp/example/configuration.md
@@ -34,4 +34,6 @@ config.writer_batch_timeout_ms = 100; // Max time
to wait for a batch
config.writer_bucket_no_key_assigner = "sticky"; // "sticky" or "round_robin"
config.scanner_remote_log_prefetch_num = 4; // Remote log prefetch count
config.remote_file_download_thread_num = 3; // Download threads
+config.scanner_remote_log_read_concurrency = 4; // In-file remote log read
concurrency
+config.scanner_log_max_poll_records = 500; // Max records returned per
poll()
```
diff --git a/website/docs/user-guide/python/example/configuration.md
b/website/docs/user-guide/python/example/configuration.md
index 71e7199..39c53be 100644
--- a/website/docs/user-guide/python/example/configuration.md
+++ b/website/docs/user-guide/python/example/configuration.md
@@ -21,18 +21,19 @@ with await fluss.FlussConnection.create(config) as conn:
## Connection Configurations
-| Key | Description
| Default |
-|------------------------------------|--------------------------------------------------------------------------------------|--------------------|
-| `bootstrap.servers` | Coordinator server address
| `127.0.0.1:9123` |
-| `writer.request-max-size` | Maximum request size in bytes
| `10485760` (10 MB) |
-| `writer.acks` | Acknowledgment setting (`all` waits for
all replicas) | `all` |
-| `writer.retries` | Number of retries on failure
| `2147483647` |
-| `writer.batch-size` | Batch size for writes in bytes
| `2097152` (2 MB) |
-| `writer.bucket.no-key-assigner` | Bucket assignment strategy for tables
without bucket keys: `sticky` or `round_robin` | `sticky` |
-| `scanner.remote-log.prefetch-num` | Number of remote log segments to
prefetch | `4` |
-| `remote-file.download-thread-num` | Number of threads for remote log
downloads | `3` |
-| `scanner.log.max-poll-records` | Max records returned in a single poll()
| `500` |
-| `writer.batch-timeout-ms` | The maximum time to wait for a writer
batch to fill up before sending. | `100` |
+| Key | Description
| Default |
+|---------------------------------------|---------------------------------------------------------------------------------------|--------------------|
+| `bootstrap.servers` | Coordinator server address
| `127.0.0.1:9123` |
+| `writer.request-max-size` | Maximum request size in bytes
| `10485760` (10 MB) |
+| `writer.acks` | Acknowledgment setting (`all` waits
for all replicas) | `all` |
+| `writer.retries` | Number of retries on failure
| `2147483647` |
+| `writer.batch-size` | Batch size for writes in bytes
| `2097152` (2 MB) |
+| `writer.batch-timeout-ms` | The maximum time to wait for a
writer batch to fill up before sending. | `100` |
+| `writer.bucket.no-key-assigner` | Bucket assignment strategy for
tables without bucket keys: `sticky` or `round_robin` | `sticky` |
+| `scanner.remote-log.prefetch-num` | Number of remote log segments to
prefetch | `4` |
+| `remote-file.download-thread-num` | Number of threads for remote log
downloads | `3` |
+| `scanner.remote-log.read-concurrency` | Streaming read concurrency within a
remote log file | `4` |
+| `scanner.log.max-poll-records` | Max records returned in a single
poll() | `500` |
Remember to close the connection when done:
diff --git a/website/docs/user-guide/rust/example/configuration.md
b/website/docs/user-guide/rust/example/configuration.md
index 7d7cc93..a2f52dc 100644
--- a/website/docs/user-guide/rust/example/configuration.md
+++ b/website/docs/user-guide/rust/example/configuration.md
@@ -24,5 +24,9 @@ let conn = FlussConnection::new(config).await?;
| `writer_acks` | Acknowledgment setting (`all` waits for
all replicas) | `all` |
| `writer_retries` | Number of retries on failure
| `i32::MAX` |
| `writer_batch_size` | Batch size for writes
| 2 MB |
-| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables
without bucket keys: `sticky` or `round_robin` | `sticky` |
| `writer_batch_timeout_ms` | The maximum time to wait for a writer
batch to fill up before sending. | `100` |
+| `writer_bucket_no_key_assigner` | Bucket assignment strategy for tables
without bucket keys: `sticky` or `round_robin` | `sticky` |
+| `scanner_remote_log_prefetch_num` | Number of remote log segments to
prefetch | `4` |
+| `remote_file_download_thread_num` | Number of concurrent remote log file
downloads | `3` |
+| `scanner_remote_log_read_concurrency` | Streaming read concurrency within a
remote log file | `4` |
+| `scanner_log_max_poll_records` | Maximum records returned in a single
`poll()` | `500` |