This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 7ee9bfe feat: introduce priority queue for downloading remote
segments (#187)
7ee9bfe is described below
commit 7ee9bfe5b9fbef568b40553354cc81efd7071214
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Jan 25 01:09:12 2026 +0000
feat: introduce priority queue for downloading remote segments (#187)
---
crates/fluss/src/client/connection.rs | 4 +
crates/fluss/src/client/table/log_fetch_buffer.rs | 195 +++-
crates/fluss/src/client/table/mod.rs | 3 +
crates/fluss/src/client/table/remote_log.rs | 1087 ++++++++++++++++++---
crates/fluss/src/client/table/scanner.rs | 44 +-
crates/fluss/src/config.rs | 12 +
crates/fluss/src/proto/fluss_api.proto | 1 +
crates/fluss/src/record/arrow.rs | 432 +++++++-
crates/fluss/src/util/mod.rs | 9 +-
9 files changed, 1611 insertions(+), 176 deletions(-)
diff --git a/crates/fluss/src/client/connection.rs
b/crates/fluss/src/client/connection.rs
index 595daf5..0e41bbe 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -59,6 +59,10 @@ impl FlussConnection {
self.network_connects.clone()
}
+ pub fn config(&self) -> &Config {
+ &self.args
+ }
+
pub async fn get_admin(&self) -> Result<FlussAdmin> {
FlussAdmin::new(self.network_connects.clone(),
self.metadata.clone()).await
}
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index 214a79c..4a64eda 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -18,15 +18,22 @@
use arrow::array::RecordBatch;
use parking_lot::Mutex;
+use crate::client::table::remote_log::{
+ PrefetchPermit, RemoteLogDownloadFuture, RemoteLogFile, RemoteLogSegment,
+};
use crate::error::{ApiError, Error, Result};
use crate::metadata::TableBucket;
use crate::record::{
LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext,
ScanRecord,
};
-use std::collections::{HashMap, VecDeque};
-use std::sync::Arc;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::time::Duration;
+use std::{
+ collections::{HashMap, VecDeque},
+ sync::{
+ Arc,
+ atomic::{AtomicBool, Ordering},
+ },
+ time::{Duration, Instant},
+};
use tokio::sync::Notify;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -106,7 +113,7 @@ impl LogFetchBuffer {
/// Wait for the buffer to become non-empty, with timeout.
/// Returns true if data became available, false if timeout.
pub async fn await_not_empty(&self, timeout: Duration) -> Result<bool> {
- let deadline = std::time::Instant::now() + timeout;
+ let deadline = Instant::now() + timeout;
loop {
// Check if buffer is not empty
@@ -122,7 +129,7 @@ impl LogFetchBuffer {
}
// Check if timeout
- let now = std::time::Instant::now();
+ let now = Instant::now();
if now >= deadline {
return Ok(false);
}
@@ -325,6 +332,7 @@ impl PendingFetch for CompletedPendingFetch {
}
/// Default implementation of CompletedFetch for in-memory log records
+/// Used for local fetches from tablet server
pub struct DefaultCompletedFetch {
table_bucket: TableBucket,
api_error: Option<ApiError>,
@@ -441,7 +449,8 @@ impl DefaultCompletedFetch {
if record.offset() >= self.next_fetch_offset {
return Ok(Some(record));
}
- } else if let Some(batch) = self.log_record_batch.next() {
+ } else if let Some(batch_result) = self.log_record_batch.next() {
+ let batch = batch_result?;
self.current_record_iterator =
Some(batch.records(&self.read_context)?);
self.current_record_batch = Some(batch);
} else {
@@ -470,11 +479,12 @@ impl DefaultCompletedFetch {
/// Get the next batch directly without row iteration
fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
loop {
- let Some(log_batch) = self.log_record_batch.next() else {
+ let Some(log_batch_result) = self.log_record_batch.next() else {
self.drain();
return Ok(None);
};
+ let log_batch = log_batch_result?;
let mut record_batch = log_batch.record_batch(&self.read_context)?;
// Skip empty batches
@@ -644,6 +654,174 @@ impl CompletedFetch for DefaultCompletedFetch {
}
}
+/// Completed fetch for remote log segments
+/// Matches Java's RemoteCompletedFetch design - separate class for remote vs
local
+/// Holds RAII permit until consumed (data is in inner)
+pub struct RemoteCompletedFetch {
+ inner: DefaultCompletedFetch,
+ permit: Option<PrefetchPermit>,
+}
+
+impl RemoteCompletedFetch {
+ pub fn new(inner: DefaultCompletedFetch, permit: PrefetchPermit) -> Self {
+ Self {
+ inner,
+ permit: Some(permit),
+ }
+ }
+}
+
+impl CompletedFetch for RemoteCompletedFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ self.inner.table_bucket()
+ }
+
+ fn api_error(&self) -> Option<&ApiError> {
+ self.inner.api_error()
+ }
+
+ fn fetch_error_context(&self) -> Option<&FetchErrorContext> {
+ self.inner.fetch_error_context()
+ }
+
+ fn take_error(&mut self) -> Option<Error> {
+ self.inner.take_error()
+ }
+
+ fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>
{
+ self.inner.fetch_records(max_records)
+ }
+
+ fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<RecordBatch>> {
+ self.inner.fetch_batches(max_batches)
+ }
+
+ fn is_consumed(&self) -> bool {
+ self.inner.is_consumed()
+ }
+
+ fn records_read(&self) -> usize {
+ self.inner.records_read()
+ }
+
+ fn drain(&mut self) {
+ self.inner.drain();
+ // Release permit immediately (don't wait for struct drop)
+ // Critical: allows prefetch to continue even if Box<dyn
CompletedFetch> kept around
+ self.permit.take(); // drops permit here, triggers recycle notification
+ }
+
+ fn size_in_bytes(&self) -> usize {
+ self.inner.size_in_bytes()
+ }
+
+ fn high_watermark(&self) -> i64 {
+ self.inner.high_watermark()
+ }
+
+ fn is_initialized(&self) -> bool {
+ self.inner.is_initialized()
+ }
+
+ fn set_initialized(&mut self) {
+ self.inner.set_initialized()
+ }
+
+ fn next_fetch_offset(&self) -> i64 {
+ self.inner.next_fetch_offset()
+ }
+}
+// Permit released explicitly in drain() or automatically when struct drops
+
+/// Pending fetch that waits for remote log file to be downloaded
+pub struct RemotePendingFetch {
+ segment: RemoteLogSegment,
+ download_future: RemoteLogDownloadFuture,
+ pos_in_log_segment: i32,
+ fetch_offset: i64,
+ high_watermark: i64,
+ read_context: ReadContext,
+}
+
+impl RemotePendingFetch {
+ pub fn new(
+ segment: RemoteLogSegment,
+ download_future: RemoteLogDownloadFuture,
+ pos_in_log_segment: i32,
+ fetch_offset: i64,
+ high_watermark: i64,
+ read_context: ReadContext,
+ ) -> Self {
+ Self {
+ segment,
+ download_future,
+ pos_in_log_segment,
+ fetch_offset,
+ high_watermark,
+ read_context,
+ }
+ }
+}
+
+impl PendingFetch for RemotePendingFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ &self.segment.table_bucket
+ }
+
+ fn is_completed(&self) -> bool {
+ self.download_future.is_done()
+ }
+
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+ // Take the RemoteLogFile and destructure
+ let remote_log_file = self.download_future.take_remote_log_file()?;
+ let RemoteLogFile {
+ file_path,
+ file_size: _,
+ permit,
+ } = remote_log_file;
+
+ // Open file for streaming (no memory allocation for entire file)
+ let file = std::fs::File::open(&file_path)?;
+ let file_size = file.metadata()?.len() as usize;
+
+ // Create file-backed LogRecordsBatches with cleanup (streaming!)
+ // Data will be read batch-by-batch on-demand, not all at once
+ // FileSource will delete the file when dropped (after file is closed)
+ let log_record_batch =
+ LogRecordsBatches::from_file(file, self.pos_in_log_segment as
usize, file_path)?;
+
+ // Calculate size based on position offset
+ let size_in_bytes = if self.pos_in_log_segment > 0 {
+ let pos = self.pos_in_log_segment as usize;
+ if pos >= file_size {
+ return Err(Error::UnexpectedError {
+ message: format!("Position {} exceeds file size {}", pos,
file_size),
+ source: None,
+ });
+ }
+ file_size - pos
+ } else {
+ file_size
+ };
+
+ // Create DefaultCompletedFetch
+ let inner_fetch = DefaultCompletedFetch::new(
+ self.segment.table_bucket.clone(),
+ log_record_batch,
+ size_in_bytes,
+ self.read_context,
+ self.fetch_offset,
+ self.high_watermark,
+ );
+
+ // Wrap it with RemoteCompletedFetch to hold the permit
+ // Permit manages the prefetch slot (releases semaphore and notifies
coordinator) when dropped;
+ // file deletion is handled by FileCleanupGuard in the file-backed
source created via from_file
+ Ok(Box::new(RemoteCompletedFetch::new(inner_fetch, permit)))
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -655,7 +833,6 @@ mod tests {
use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext,
to_arrow_schema};
use crate::row::GenericRow;
use std::sync::Arc;
- use std::time::Duration;
fn test_read_context() -> Result<ReadContext> {
let row_type = RowType::new(vec![DataField::new(
diff --git a/crates/fluss/src/client/table/mod.rs
b/crates/fluss/src/client/table/mod.rs
index 2bfa054..2dc56d5 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -36,6 +36,9 @@ mod writer;
use crate::client::table::upsert::TableUpsert;
pub use append::{AppendWriter, TableAppend};
pub use lookup::{LookupResult, Lookuper, TableLookup};
+pub use remote_log::{
+ DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS,
DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM,
+};
pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
pub use writer::{TableWriter, UpsertWriter};
diff --git a/crates/fluss/src/client/table/remote_log.rs
b/crates/fluss/src/client/table/remote_log.rs
index 0142515..c39056d 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -14,21 +14,84 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use crate::client::table::log_fetch_buffer::{CompletedFetch,
DefaultCompletedFetch, PendingFetch};
use crate::error::{Error, Result};
use crate::io::{FileIO, Storage};
use crate::metadata::TableBucket;
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
-use crate::record::{LogRecordsBatches, ReadContext};
-use crate::util::delete_file;
use parking_lot::{Mutex, RwLock};
-use std::collections::HashMap;
-use std::io;
-use std::path::{Path, PathBuf};
-use std::sync::Arc;
+use std::{
+ cmp::{Ordering, Reverse, min},
+ collections::{BinaryHeap, HashMap},
+ future::Future,
+ io, mem,
+ path::{Path, PathBuf},
+ pin::Pin,
+ sync::Arc,
+ time::Duration,
+};
+
+#[cfg(test)]
+use std::{
+ env,
+ time::{SystemTime, UNIX_EPOCH},
+};
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
-use tokio::sync::oneshot;
+use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
+use tokio::task::JoinSet;
+
+/// Default maximum number of remote log segments to prefetch
+/// Matches Java's CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM (default: 4)
+pub const DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM: usize = 4;
+
+/// Default maximum concurrent remote log downloads
+/// Matches Java's REMOTE_FILE_DOWNLOAD_THREAD_NUM (default: 3)
+pub const DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS: usize = 3;
+
+/// Initial retry backoff delay (milliseconds)
+/// Prevents hot-spin retry loops on persistent failures
+const RETRY_BACKOFF_BASE_MS: u64 = 100;
+
+/// Maximum retry backoff delay (milliseconds)
+/// Caps exponential backoff to avoid excessive delays
+const RETRY_BACKOFF_MAX_MS: u64 = 5_000;
+
+/// Maximum number of retries before giving up
+/// After this many retries, the download will fail permanently
+const MAX_RETRY_COUNT: u32 = 10;
+
+/// Calculate exponential backoff delay with jitter for retries
+fn calculate_backoff_delay(retry_count: u32) -> tokio::time::Duration {
+ use rand::Rng;
+
+ // Exponential backoff: base * 2^retry_count
+ let exponential_ms = RETRY_BACKOFF_BASE_MS.saturating_mul(1 <<
retry_count.min(10)); // Cap exponent to prevent overflow
+
+ // Cap at maximum
+ let capped_ms = exponential_ms.min(RETRY_BACKOFF_MAX_MS);
+
+ // Add jitter (±25% randomness) to avoid thundering herd
+ let mut rng = rand::rng();
+ let jitter = rng.random_range(0.75..=1.25);
+ let final_ms = ((capped_ms as f64) * jitter) as u64;
+
+ tokio::time::Duration::from_millis(final_ms)
+}
+
+/// Result of a fetch operation containing file path and size
+#[derive(Debug)]
+pub struct FetchResult {
+ pub file_path: PathBuf,
+ pub file_size: usize,
+}
+
+/// Trait for fetching remote log segments (allows dependency injection for
testing)
+pub trait RemoteLogFetcher: Send + Sync {
+ fn fetch(
+ &self,
+ request: &RemoteLogDownloadRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>>;
+}
/// Represents a remote log segment that needs to be downloaded
#[derive(Debug, Clone)]
@@ -40,6 +103,7 @@ pub struct RemoteLogSegment {
#[allow(dead_code)]
pub size_in_bytes: i32,
pub table_bucket: TableBucket,
+ pub max_timestamp: i64,
}
impl RemoteLogSegment {
@@ -50,6 +114,9 @@ impl RemoteLogSegment {
end_offset: segment.remote_log_end_offset,
size_in_bytes: segment.segment_size_in_bytes,
table_bucket,
+ // Match Java's behavior: use -1 for missing timestamp
+ // (Java: CommonRpcMessageUtils.java:171-174)
+ max_timestamp: segment.max_timestamp.unwrap_or(-1),
}
}
@@ -88,17 +155,473 @@ impl RemoteLogFetchInfo {
}
}
+/// RAII guard for prefetch permit that notifies coordinator on drop
+///
+/// NOTE: File deletion is now handled by FileSource::drop(), not here.
+/// This ensures the file is closed before deletion
+#[derive(Debug)]
+pub struct PrefetchPermit {
+ permit: Option<OwnedSemaphorePermit>,
+ recycle_notify: Arc<Notify>,
+}
+
+impl PrefetchPermit {
+ fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc<Notify>) -> Self {
+ Self {
+ permit: Some(permit),
+ recycle_notify,
+ }
+ }
+}
+
+impl Drop for PrefetchPermit {
+ fn drop(&mut self) {
+ // Release capacity (critical: permit must be dropped before notify)
+ let _ = self.permit.take(); // drops permit here
+
+ // Then wake coordinator so it can acquire the now-available permit
+ self.recycle_notify.notify_one();
+ }
+}
+
+/// Downloaded remote log file with prefetch permit
+/// File remains on disk for memory efficiency; file deletion is handled by
FileCleanupGuard in FileSource
+#[derive(Debug)]
+pub struct RemoteLogFile {
+ /// Path to the downloaded file on local disk
+ pub file_path: PathBuf,
+ /// Size of the file in bytes
+ /// Currently unused but kept for potential future use (logging, metrics,
etc.)
+ #[allow(dead_code)]
+ pub file_size: usize,
+ /// RAII permit that releases prefetch semaphore slot and notifies
coordinator when dropped
+ pub permit: PrefetchPermit,
+}
+
+/// Represents a request to download a remote log segment with priority
ordering
+#[derive(Debug)]
+pub struct RemoteLogDownloadRequest {
+ segment: RemoteLogSegment,
+ remote_log_tablet_dir: String,
+ result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+ retry_count: u32,
+ next_retry_at: Option<tokio::time::Instant>,
+}
+
+impl RemoteLogDownloadRequest {
+ /// Get the segment (used by test fetcher implementations)
+ #[cfg(test)]
+ pub fn segment(&self) -> &RemoteLogSegment {
+ &self.segment
+ }
+}
+
+// Total ordering for priority queue (Rust requirement: cmp==Equal implies Eq)
+// Primary: Java semantics (timestamp cross-bucket, offset within-bucket)
+// Tie-breakers: table_bucket fields (table_id, partition_id, bucket_id), then
segment_id
+impl Ord for RemoteLogDownloadRequest {
+ fn cmp(&self, other: &Self) -> Ordering {
+ if self.segment.table_bucket == other.segment.table_bucket {
+ // Same bucket: order by start_offset (ascending - earlier
segments first)
+ self.segment
+ .start_offset
+ .cmp(&other.segment.start_offset)
+ .then_with(||
self.segment.segment_id.cmp(&other.segment.segment_id))
+ } else {
+ // Different buckets: order by max_timestamp (ascending - older
segments first)
+ // Then by table_bucket fields for true total ordering
+ self.segment
+ .max_timestamp
+ .cmp(&other.segment.max_timestamp)
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .table_id()
+ .cmp(&other.segment.table_bucket.table_id())
+ })
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .partition_id()
+ .cmp(&other.segment.table_bucket.partition_id())
+ })
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .bucket_id()
+ .cmp(&other.segment.table_bucket.bucket_id())
+ })
+ .then_with(||
self.segment.segment_id.cmp(&other.segment.segment_id))
+ }
+ }
+}
+
+impl PartialOrd for RemoteLogDownloadRequest {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl PartialEq for RemoteLogDownloadRequest {
+ fn eq(&self, other: &Self) -> bool {
+ self.cmp(other) == Ordering::Equal
+ }
+}
+
+impl Eq for RemoteLogDownloadRequest {}
+
+/// Result of a download task
+enum DownloadResult {
+ /// Successful download - deliver result to future
+ Success {
+ result: RemoteLogFile,
+ result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+ },
+ /// Download failed - re-queue request for retry (Java pattern)
+ FailedRetry { request: RemoteLogDownloadRequest },
+ /// Download failed permanently after max retries - fail the future
+ FailedPermanently {
+ error: Error,
+ result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+ },
+ /// Cancelled - don't deliver, don't re-queue
+ Cancelled,
+}
+
+/// Production implementation of RemoteLogFetcher that downloads from actual
storage
+struct ProductionFetcher {
+ remote_fs_props: Arc<RwLock<HashMap<String, String>>>,
+ local_log_dir: Arc<TempDir>,
+}
+
+impl RemoteLogFetcher for ProductionFetcher {
+ fn fetch(
+ &self,
+ request: &RemoteLogDownloadRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
+ let remote_fs_props = self.remote_fs_props.clone();
+ let local_log_dir = self.local_log_dir.clone();
+
+ // Clone data needed for async operation to avoid lifetime issues
+ let segment = request.segment.clone();
+ let remote_log_tablet_dir = request.remote_log_tablet_dir.to_string();
+
+ Box::pin(async move {
+ let local_file_name = segment.local_file_name();
+ let local_file_path = local_log_dir.path().join(&local_file_name);
+
+ // Build remote path
+ let offset_prefix = format!("{:020}", segment.start_offset);
+ let remote_path = format!(
+ "{}/{}/{}.log",
+ remote_log_tablet_dir, segment.segment_id, offset_prefix
+ );
+
+ let remote_fs_props_map = remote_fs_props.read().clone();
+
+ // Download file to disk (streaming, no memory spike)
+ let file_path = RemoteLogDownloader::download_file(
+ &remote_log_tablet_dir,
+ &remote_path,
+ &local_file_path,
+ &remote_fs_props_map,
+ )
+ .await?;
+
+ // Get file size
+ let metadata = tokio::fs::metadata(&file_path).await?;
+ let file_size = metadata.len() as usize;
+
+ // Return file path - file stays on disk until PrefetchPermit is
dropped
+ Ok(FetchResult {
+ file_path,
+ file_size,
+ })
+ })
+ }
+}
+
+/// Coordinator that owns all download state and orchestrates downloads
+struct DownloadCoordinator {
+ download_queue: BinaryHeap<Reverse<RemoteLogDownloadRequest>>,
+ active_downloads: JoinSet<DownloadResult>,
+ in_flight: usize,
+ prefetch_semaphore: Arc<Semaphore>,
+ max_concurrent_downloads: usize,
+ recycle_notify: Arc<Notify>,
+ fetcher: Arc<dyn RemoteLogFetcher>,
+}
+
+impl DownloadCoordinator {
+ /// Check if we should wait for recycle notification
+ /// Only wait if we're blocked on permits AND have pending work
+ fn should_wait_for_recycle(&self) -> bool {
+ !self.download_queue.is_empty()
+ && self.in_flight < self.max_concurrent_downloads
+ && self.prefetch_semaphore.available_permits() == 0
+ }
+
+ /// Find the earliest retry deadline among pending requests
+ fn next_retry_deadline(&self) -> Option<tokio::time::Instant> {
+ self.download_queue
+ .iter()
+ .filter_map(|Reverse(req)| req.next_retry_at)
+ .min()
+ }
+}
+
+impl DownloadCoordinator {
+ /// Try to start as many downloads as possible (event-driven drain)
+ fn drain(&mut self) {
+ // Collect deferred requests (backoff not ready) to push back later
+ let mut deferred = Vec::new();
+ // Scan entire queue once to find ready requests (prevents
head-of-line blocking)
+ // Bound to reasonable max to avoid excessive work if queue is huge
+ let max_scan = self.download_queue.len().min(100);
+ let mut scanned = 0;
+
+ while !self.download_queue.is_empty()
+ && self.in_flight < self.max_concurrent_downloads
+ && scanned < max_scan
+ {
+ // Try acquire prefetch permit (non-blocking)
+ let permit = match
self.prefetch_semaphore.clone().try_acquire_owned() {
+ Ok(p) => p,
+ Err(_) => break, // No permits available
+ };
+
+ // Pop highest priority request
+ let Some(Reverse(request)) = self.download_queue.pop() else {
+ drop(permit);
+ break;
+ };
+
+ scanned += 1;
+
+ // Retry backoff check: defer if retry time hasn't arrived yet
+ if let Some(next_retry_at) = request.next_retry_at {
+ let now = tokio::time::Instant::now();
+ if next_retry_at > now {
+ // Not ready for retry yet - defer and continue looking
for ready requests
+ drop(permit);
+ deferred.push(request);
+ continue; // Don't block - keep looking for ready requests
+ }
+ }
+
+ // Cancellation check: skip if sender closed
+ if request.result_sender.is_closed() {
+ drop(permit);
+ continue; // Try next request
+ }
+
+ // Clone data for the spawned task
+ let fetcher = self.fetcher.clone();
+ let recycle_notify = self.recycle_notify.clone();
+
+ // Spawn download task
+ self.active_downloads.spawn(async move {
+ spawn_download_task(request, permit, fetcher,
recycle_notify).await
+ });
+ self.in_flight += 1;
+ }
+
+ // Push deferred requests back to queue (maintains priority order)
+ if !deferred.is_empty() {
+ for req in deferred {
+ self.download_queue.push(Reverse(req));
+ }
+ }
+ }
+}
+
+/// Spawn a download task that attempts download once
+/// Matches Java's RemoteLogDownloader.java
+///
+/// Benefits over infinite in-place retry:
+/// - Failed downloads don't block prefetch slots
+/// - Other segments can make progress while one is failing
+/// - Natural retry through coordinator re-picking from queue
+async fn spawn_download_task(
+ request: RemoteLogDownloadRequest,
+ permit: tokio::sync::OwnedSemaphorePermit,
+ fetcher: Arc<dyn RemoteLogFetcher>,
+ recycle_notify: Arc<Notify>,
+) -> DownloadResult {
+ // Check if receiver still alive (early cancellation check)
+ if request.result_sender.is_closed() {
+ drop(permit);
+ return DownloadResult::Cancelled;
+ }
+
+ // Try download ONCE
+ let download_result = fetcher.fetch(&request).await;
+
+ match download_result {
+ Ok(fetch_result) => {
+ // Success - permit will be released on drop (FileSource handles
file deletion)
+ DownloadResult::Success {
+ result: RemoteLogFile {
+ file_path: fetch_result.file_path,
+ file_size: fetch_result.file_size,
+ permit: PrefetchPermit::new(permit,
recycle_notify.clone()),
+ },
+ result_sender: request.result_sender,
+ }
+ }
+ Err(e) if request.result_sender.is_closed() => {
+ // Receiver dropped (cancelled) - release permit, don't re-queue
+ drop(permit);
+ DownloadResult::Cancelled
+ }
+ Err(e) => {
+ // Download failed - check if we should retry or give up
+ let retry_count = request.retry_count + 1;
+
+ if retry_count > MAX_RETRY_COUNT {
+ // Too many retries - give up and fail the future
+ log::error!(
+ "Failed to download remote log segment {} after {}
retries: {}. Giving up.",
+ request.segment.segment_id,
+ retry_count,
+ e
+ );
+ drop(permit); // Release immediately
+
+ DownloadResult::FailedPermanently {
+ error: Error::UnexpectedError {
+ message: format!(
+ "Failed to download remote log segment after {}
retries: {}",
+ retry_count, e
+ ),
+ source: Some(Box::new(e)),
+ },
+ result_sender: request.result_sender,
+ }
+ } else {
+ // Retry with exponential backoff
+ let backoff_delay = calculate_backoff_delay(retry_count);
+ let next_retry_at = tokio::time::Instant::now() +
backoff_delay;
+
+ log::warn!(
+ "Failed to download remote log segment {}: {}. Retry {}/{}
after {:?}",
+ request.segment.segment_id,
+ e,
+ retry_count,
+ MAX_RETRY_COUNT,
+ backoff_delay
+ );
+ drop(permit); // Release immediately - critical!
+
+ // Update retry state
+ let mut retry_request = request;
+ retry_request.retry_count = retry_count;
+ retry_request.next_retry_at = Some(next_retry_at);
+
+ // Re-queue request to same priority queue
+ // Future stays with request, NOT completed - will complete on
successful retry
+ DownloadResult::FailedRetry {
+ request: retry_request,
+ }
+ }
+ }
+ }
+}
+
+/// Coordinator event loop - owns all download state and reacts to events
+async fn coordinator_loop(
+ mut coordinator: DownloadCoordinator,
+ mut request_receiver: mpsc::UnboundedReceiver<RemoteLogDownloadRequest>,
+) {
+ loop {
+ // Drain once at start of iteration to process ready work
+ coordinator.drain();
+
+ // Calculate sleep duration until next retry (if any deferred requests)
+ let next_retry_sleep =
coordinator.next_retry_deadline().map(|deadline| {
+ let now = tokio::time::Instant::now();
+ if deadline > now {
+ deadline - now
+ } else {
+ tokio::time::Duration::from_millis(0) // Ready now
+ }
+ });
+
+ tokio::select! {
+ // Event 1: NewRequest
+ Some(request) = request_receiver.recv() => {
+ coordinator.download_queue.push(Reverse(request));
+ // Immediately try to start this download
+ continue;
+ }
+
+ // Event 2: DownloadFinished
+ Some(result) = coordinator.active_downloads.join_next() => {
+ coordinator.in_flight -= 1;
+
+ match result {
+ Ok(DownloadResult::Success { result, result_sender }) => {
+ // Success - deliver result to future
+ if !result_sender.is_closed() {
+ let _ = result_sender.send(Ok(result));
+ }
+ // Permit held in RemoteLogFile until consumed
+ }
+ Ok(DownloadResult::FailedRetry { request }) => {
+ // Re-queue immediately (don't block coordinator with
sleep)
+ // The retry time will be checked in drain() before
processing
+ // (Java line 177: segmentsToFetch.add(request))
+ // Permit already released (Java line 174)
+ coordinator.download_queue.push(Reverse(request));
+ }
+ Ok(DownloadResult::FailedPermanently { error,
result_sender }) => {
+ // Permanent failure - deliver error to future
+ if !result_sender.is_closed() {
+ let _ = result_sender.send(Err(error));
+ }
+ // Permit already released
+ }
+ Ok(DownloadResult::Cancelled) => {
+ // Cancelled - permit already released, nothing to do
+ }
+ Err(e) => {
+ log::error!("Download task panicked: {:?}", e);
+ // Permit already released via RAII
+ }
+ }
+ // Immediately try to start another download
+ continue;
+ }
+
+ // Event 3: Recycled (only wait when blocked on permits with
pending work)
+ _ = coordinator.recycle_notify.notified(),
+ if coordinator.should_wait_for_recycle() => {
+ // Wake up to try draining
+ continue;
+ }
+
+ // Event 4: Retry timer - wake up when next retry is ready
+ _ =
tokio::time::sleep(next_retry_sleep.unwrap_or(tokio::time::Duration::from_secs(3600))),
+ if next_retry_sleep.is_some() => {
+ // Wake up to retry deferred requests
+ continue;
+ }
+
+ else => break, // All channels closed AND no work pending
+ }
+ }
+}
+
type CompletionCallback = Box<dyn Fn() + Send + Sync>;
/// Future for a remote log download request
pub struct RemoteLogDownloadFuture {
- result: Arc<Mutex<Option<Result<Vec<u8>>>>>,
+ result: Arc<Mutex<Option<Result<RemoteLogFile>>>>,
completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>>,
- // todo: add recycleCallback
}
impl RemoteLogDownloadFuture {
- pub fn new(receiver: oneshot::Receiver<Result<Vec<u8>>>) -> Self {
+ pub fn new(receiver: oneshot::Receiver<Result<RemoteLogFile>>) -> Self {
let result = Arc::new(Mutex::new(None));
let result_clone = Arc::clone(&result);
let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> =
@@ -123,7 +646,7 @@ impl RemoteLogDownloadFuture {
// This also ensures that any callbacks registered after this
point will be called immediately
let callbacks: Vec<CompletionCallback> = {
let mut callbacks_guard = callbacks_clone.lock();
- std::mem::take(&mut *callbacks_guard)
+ mem::take(&mut *callbacks_guard)
};
for callback in callbacks {
callback();
@@ -172,40 +695,90 @@ impl RemoteLogDownloadFuture {
self.result.lock().is_some()
}
- /// Get the downloaded file path (synchronous, only works after is_done()
returns true)
- pub fn get_remote_log_bytes(&self) -> Result<Vec<u8>> {
- // todo: handle download fail
- let guard = self.result.lock();
- match guard.as_ref() {
- Some(Ok(path)) => Ok(path.clone()),
- Some(Err(e)) => Err(Error::IoUnexpectedError {
- message: format!("Fail to get remote log bytes: {e}"),
- source: io::Error::other(format!("{e:?}")),
- }),
+ /// Take the RemoteLogFile (including the permit) from this future
+ /// This should only be called when the download is complete
+ /// This is the correct way to consume the download - it transfers permit
ownership
+ pub fn take_remote_log_file(&self) -> Result<RemoteLogFile> {
+ let mut guard = self.result.lock();
+ match guard.take() {
+ Some(Ok(remote_log_file)) => Ok(remote_log_file),
+ Some(Err(e)) => {
+ let error_msg = format!("{e}");
+ Err(Error::IoUnexpectedError {
+ message: format!("Fail to get remote log file:
{error_msg}"),
+ source: io::Error::other(error_msg),
+ })
+ }
None => Err(Error::IoUnexpectedError {
- message: "Get remote log bytes not completed yet".to_string(),
- source: io::Error::other("Get remote log bytes not completed
yet"),
+ message: "Remote log file already taken or not
ready".to_string(),
+ source: io::Error::other("Remote log file already taken or not
ready"),
}),
}
}
}
-/// Downloader for remote log segment files
+/// Downloader for remote log segment files.
+///
+/// # Shutdown behavior
+///
+/// When the downloader is dropped, the request channel closes, signaling the
coordinator
+/// to stop accepting new work. The coordinator will finish any in-flight
downloads but
+/// won't wait for completion. Pending futures will fail.
pub struct RemoteLogDownloader {
- local_log_dir: TempDir,
- remote_fs_props: RwLock<HashMap<String, String>>,
+ request_sender: Option<mpsc::UnboundedSender<RemoteLogDownloadRequest>>,
+ remote_fs_props: Option<Arc<RwLock<HashMap<String, String>>>>,
}
impl RemoteLogDownloader {
- pub fn new(local_log_dir: TempDir) -> Result<Self> {
+ pub fn new(
+ local_log_dir: TempDir,
+ max_prefetch_segments: usize,
+ max_concurrent_downloads: usize,
+ ) -> Result<Self> {
+ let remote_fs_props = Arc::new(RwLock::new(HashMap::new()));
+ let fetcher = Arc::new(ProductionFetcher {
+ remote_fs_props: remote_fs_props.clone(),
+ local_log_dir: Arc::new(local_log_dir),
+ });
+
+ let mut downloader =
+ Self::new_with_fetcher(fetcher, max_prefetch_segments,
max_concurrent_downloads)?;
+ downloader.remote_fs_props = Some(remote_fs_props);
+ Ok(downloader)
+ }
+
+ /// Create a RemoteLogDownloader with a custom fetcher (for testing).
+ /// The remote_fs_props will be None since custom fetchers typically don't
need S3 credentials.
+ pub fn new_with_fetcher(
+ fetcher: Arc<dyn RemoteLogFetcher>,
+ max_prefetch_segments: usize,
+ max_concurrent_downloads: usize,
+ ) -> Result<Self> {
+ let (request_sender, request_receiver) = mpsc::unbounded_channel();
+
+ let coordinator = DownloadCoordinator {
+ download_queue: BinaryHeap::new(),
+ active_downloads: JoinSet::new(),
+ in_flight: 0,
+ prefetch_semaphore:
Arc::new(Semaphore::new(max_prefetch_segments)),
+ max_concurrent_downloads,
+ recycle_notify: Arc::new(Notify::new()),
+ fetcher,
+ };
+
+ // Spawn coordinator task - it will exit when request_sender is dropped
+ tokio::spawn(coordinator_loop(coordinator, request_receiver));
+
Ok(Self {
- local_log_dir,
- remote_fs_props: RwLock::new(HashMap::new()),
+ request_sender: Some(request_sender),
+ remote_fs_props: None,
})
}
pub fn set_remote_fs_props(&self, props: HashMap<String, String>) {
- *self.remote_fs_props.write() = props;
+ if let Some(ref remote_fs_props) = self.remote_fs_props {
+ *remote_fs_props.write() = props;
+ }
}
/// Request to fetch a remote log segment to local. This method is
non-blocking.
@@ -214,49 +787,44 @@ impl RemoteLogDownloader {
remote_log_tablet_dir: &str,
segment: &RemoteLogSegment,
) -> RemoteLogDownloadFuture {
- let (sender, receiver) = oneshot::channel();
- let local_file_name = segment.local_file_name();
- let local_file_path = self.local_log_dir.path().join(&local_file_name);
- let remote_path = self.build_remote_path(remote_log_tablet_dir,
segment);
- let remote_log_tablet_dir = remote_log_tablet_dir.to_string();
- let remote_fs_props = self.remote_fs_props.read().clone();
- // Spawn async download & read task
- tokio::spawn(async move {
- let result = async {
- let file_path = Self::download_file(
- &remote_log_tablet_dir,
- &remote_path,
- &local_file_path,
- &remote_fs_props,
- )
- .await?;
- let bytes = tokio::fs::read(&file_path).await?;
-
- // Delete the downloaded local file to free disk (async, but
we'll do it in background)
- let file_path_clone = file_path.clone();
- tokio::spawn(async move {
- let _ = delete_file(file_path_clone).await;
- });
-
- Ok(bytes)
+ let (result_sender, result_receiver) = oneshot::channel();
+
+ let request = RemoteLogDownloadRequest {
+ segment: segment.clone(),
+ remote_log_tablet_dir: remote_log_tablet_dir.to_string(),
+ result_sender,
+ retry_count: 0,
+ next_retry_at: None,
+ };
+
+ // Send to coordinator (non-blocking)
+ if let Some(ref sender) = self.request_sender {
+ if sender.send(request).is_err() {
+ // Coordinator is gone - immediately fail the future
+ let (error_sender, error_receiver) = oneshot::channel();
+ let _ = error_sender.send(Err(Error::UnexpectedError {
+ message: "RemoteLogDownloader coordinator has shut
down".to_string(),
+ source: None,
+ }));
+ return RemoteLogDownloadFuture::new(error_receiver);
}
- .await;
+ }
- let _ = sender.send(result);
- });
- RemoteLogDownloadFuture::new(receiver)
+ RemoteLogDownloadFuture::new(result_receiver)
}
+}
- /// Build the remote path for a log segment
- fn build_remote_path(&self, remote_log_tablet_dir: &str, segment:
&RemoteLogSegment) -> String {
- // Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log
- let offset_prefix = format!("{:020}", segment.start_offset);
- format!(
- "{}/{}/{}.log",
- remote_log_tablet_dir, segment.segment_id, offset_prefix
- )
+impl Drop for RemoteLogDownloader {
+ fn drop(&mut self) {
+ // Drop the request sender to signal coordinator shutdown.
+ // This causes request_receiver.recv() to return None, allowing the
+ // coordinator to exit gracefully after processing pending work.
+ // The coordinator task will finish on its own when it sees the
channel closed.
+ drop(self.request_sender.take());
}
+}
+impl RemoteLogDownloader {
/// Download a file from remote storage to local using streaming read/write
async fn download_file(
remote_log_tablet_dir: &str,
@@ -293,7 +861,7 @@ impl RemoteLogDownloader {
let (op, relative_path) = storage.create(remote_path)?;
// Timeout for remote storage operations (30 seconds)
- const REMOTE_OP_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(30);
+ const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30);
// Get file metadata to know the size with timeout
let meta = op.stat(relative_path).await?;
@@ -310,7 +878,7 @@ impl RemoteLogDownloader {
let total_chunks = file_size.div_ceil(CHUNK_SIZE);
while offset < file_size {
- let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
+ let end = min(offset + CHUNK_SIZE, file_size);
let range = offset..end;
chunk_count += 1;
@@ -347,70 +915,349 @@ impl RemoteLogDownloader {
}
}
-/// Pending fetch that waits for remote log file to be downloaded
-pub struct RemotePendingFetch {
- segment: RemoteLogSegment,
- download_future: RemoteLogDownloadFuture,
- pos_in_log_segment: i32,
- fetch_offset: i64,
- high_watermark: i64,
- read_context: ReadContext,
-}
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::atomic::{AtomicUsize, Ordering};
-impl RemotePendingFetch {
- pub fn new(
- segment: RemoteLogSegment,
- download_future: RemoteLogDownloadFuture,
- pos_in_log_segment: i32,
- fetch_offset: i64,
- high_watermark: i64,
- read_context: ReadContext,
- ) -> Self {
- Self {
+ /// Helper function to create a TableBucket for testing
+ fn create_table_bucket(table_id: i64, bucket_id: i32) -> TableBucket {
+ TableBucket::new(table_id, bucket_id)
+ }
+
+ /// Simplified fake fetcher for testing
+ struct FakeFetcher {
+ completion_gate: Arc<Notify>,
+ in_flight: Arc<AtomicUsize>,
+ max_seen_in_flight: Arc<AtomicUsize>,
+ fail_count: Arc<Mutex<usize>>,
+ auto_complete: bool,
+ }
+
+ impl FakeFetcher {
+ fn new(fail_count: usize, auto_complete: bool) -> Self {
+ Self {
+ completion_gate: Arc::new(Notify::new()),
+ in_flight: Arc::new(AtomicUsize::new(0)),
+ max_seen_in_flight: Arc::new(AtomicUsize::new(0)),
+ fail_count: Arc::new(Mutex::new(fail_count)),
+ auto_complete,
+ }
+ }
+
+ fn max_seen_in_flight(&self) -> usize {
+ self.max_seen_in_flight.load(Ordering::SeqCst)
+ }
+
+ fn in_flight(&self) -> usize {
+ self.in_flight.load(Ordering::SeqCst)
+ }
+
+ fn release_one(&self) {
+ self.completion_gate.notify_one();
+ }
+
+ fn release_all(&self) {
+ self.completion_gate.notify_waiters();
+ }
+ }
+
+ impl RemoteLogFetcher for FakeFetcher {
+ fn fetch(
+ &self,
+ request: &RemoteLogDownloadRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
+ let gate = self.completion_gate.clone();
+ let in_flight = self.in_flight.clone();
+ let max_seen = self.max_seen_in_flight.clone();
+ let fail_count = self.fail_count.clone();
+ let segment_id = request.segment().segment_id.clone();
+ let auto_complete = self.auto_complete;
+
+ Box::pin(async move {
+ // Track in-flight
+ let current = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
+ max_seen.fetch_max(current, Ordering::SeqCst);
+
+ // Wait for gate (or auto-complete)
+ if !auto_complete {
+ gate.notified().await;
+ } else {
+ tokio::task::yield_now().await;
+ }
+
+ // Check if should fail
+ let should_fail = {
+ let mut count = fail_count.lock();
+ if *count > 0 {
+ *count -= 1;
+ true
+ } else {
+ false
+ }
+ };
+
+ in_flight.fetch_sub(1, Ordering::SeqCst);
+
+ if should_fail {
+ Err(Error::UnexpectedError {
+ message: format!("Fake fetch failed for {}",
segment_id),
+ source: None,
+ })
+ } else {
+ let fake_data = vec![1, 2, 3, 4];
+ let temp_dir = env::temp_dir();
+ let timestamp = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_nanos();
+ let file_path =
+ temp_dir.join(format!("fake_segment_{}_{}.log",
segment_id, timestamp));
+ tokio::fs::write(&file_path, &fake_data).await?;
+
+ Ok(FetchResult {
+ file_path,
+ file_size: fake_data.len(),
+ })
+ }
+ })
+ }
+ }
+
+ /// Helper function to create a RemoteLogSegment for testing
+ fn create_segment(
+ segment_id: &str,
+ start_offset: i64,
+ max_timestamp: i64,
+ table_bucket: TableBucket,
+ ) -> RemoteLogSegment {
+ RemoteLogSegment {
+ segment_id: segment_id.to_string(),
+ start_offset,
+ end_offset: start_offset + 1000,
+ size_in_bytes: 1024,
+ table_bucket,
+ max_timestamp,
+ }
+ }
+
+ /// Helper function to create a RemoteLogDownloadRequest for testing
+ fn create_request(segment: RemoteLogSegment) -> RemoteLogDownloadRequest {
+ let (result_sender, _) = oneshot::channel();
+ RemoteLogDownloadRequest {
+ remote_log_tablet_dir: "test_dir".to_string(),
segment,
- download_future,
- pos_in_log_segment,
- fetch_offset,
- high_watermark,
- read_context,
+ result_sender,
+ retry_count: 0,
+ next_retry_at: None,
}
}
-}
-impl PendingFetch for RemotePendingFetch {
- fn table_bucket(&self) -> &TableBucket {
- &self.segment.table_bucket
+ #[test]
+ fn test_priority_ordering_matching_java_test_case() {
+ // Test priority ordering: timestamp across buckets, offset within
bucket
+ // Does NOT test tie-breakers (segment_id) - those are implementation
details
+
+ let bucket1 = create_table_bucket(1, 0);
+ let bucket2 = create_table_bucket(1, 1);
+ let bucket3 = create_table_bucket(1, 2);
+ let bucket4 = create_table_bucket(1, 3);
+
+ // Create segments with distinct timestamps/offsets (no ties)
+ let seg_negative = create_segment("seg_neg", 0, -1, bucket1.clone());
+ let seg_zero = create_segment("seg_zero", 0, 0, bucket2.clone());
+ let seg_1000 = create_segment("seg_1000", 0, 1000, bucket3.clone());
+ let seg_2000 = create_segment("seg_2000", 0, 2000, bucket4.clone());
+ let seg_same_bucket_100 = create_segment("seg_sb_100", 100, 5000,
bucket1.clone());
+ let seg_same_bucket_50 = create_segment("seg_sb_50", 50, 5000,
bucket1.clone());
+
+ let mut heap = BinaryHeap::new();
+ heap.push(Reverse(create_request(seg_2000)));
+ heap.push(Reverse(create_request(seg_same_bucket_100)));
+ heap.push(Reverse(create_request(seg_1000)));
+ heap.push(Reverse(create_request(seg_zero)));
+ heap.push(Reverse(create_request(seg_negative)));
+ heap.push(Reverse(create_request(seg_same_bucket_50)));
+
+ // Verify ordering by timestamp/offset, not segment_id
+ let first = heap.pop().unwrap().0;
+ assert_eq!(first.segment.max_timestamp, -1, "Lowest timestamp first");
+
+ let second = heap.pop().unwrap().0;
+ assert_eq!(second.segment.max_timestamp, 0);
+
+ let third = heap.pop().unwrap().0;
+ assert_eq!(third.segment.max_timestamp, 1000);
+
+ let fourth = heap.pop().unwrap().0;
+ assert_eq!(fourth.segment.max_timestamp, 2000);
+
+ // Last two are same bucket (ts=5000), ordered by offset
+ let fifth = heap.pop().unwrap().0;
+ assert_eq!(fifth.segment.max_timestamp, 5000);
+ assert_eq!(
+ fifth.segment.start_offset, 50,
+ "Lower offset first within bucket"
+ );
+
+ let sixth = heap.pop().unwrap().0;
+ assert_eq!(sixth.segment.max_timestamp, 5000);
+ assert_eq!(sixth.segment.start_offset, 100);
}
- fn is_completed(&self) -> bool {
- self.download_future.is_done()
+ #[tokio::test]
+ async fn test_concurrency_and_priority() {
+ // Test concurrency limiting and priority-based scheduling together
+ let fake_fetcher = Arc::new(FakeFetcher::new(0, false)); // Manual
control
+
+ let downloader = RemoteLogDownloader::new_with_fetcher(
+ fake_fetcher.clone(),
+ 10, // High prefetch limit
+ 2, // Max concurrent downloads = 2
+ )
+ .unwrap();
+
+ let bucket = create_table_bucket(1, 0);
+
+ // Request 4 segments with same priority (to isolate concurrency
limiting from priority)
+ let segs: Vec<_> = (0..4)
+ .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000,
bucket.clone()))
+ .collect();
+
+ let _futures: Vec<_> = segs
+ .iter()
+ .map(|seg| downloader.request_remote_log("dir", seg))
+ .collect();
+
+ // Wait for exactly 2 to start
+ tokio::time::sleep(Duration::from_millis(50)).await;
+ assert_eq!(
+ fake_fetcher.in_flight(),
+ 2,
+ "Concurrency limit: exactly 2 should be in-flight"
+ );
+
+ // Release one
+ fake_fetcher.release_one();
+ tokio::time::sleep(Duration::from_millis(50)).await;
+
+ // Max should never exceed 2
+ assert_eq!(
+ fake_fetcher.max_seen_in_flight(),
+ 2,
+ "Max concurrent should not exceed 2"
+ );
+
+ // Release all
+ fake_fetcher.release_all();
}
- fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
- // Get the file path (this should only be called when is_completed()
returns true)
- let mut data = self.download_future.get_remote_log_bytes()?;
+ #[tokio::test]
+ async fn test_prefetch_limit() {
+ // Test that prefetch semaphore limits outstanding downloads
+ let fake_fetcher = Arc::new(FakeFetcher::new(0, true)); //
Auto-complete
- // Slice the data if needed
- let data = if self.pos_in_log_segment > 0 {
- data.split_off(self.pos_in_log_segment as usize)
- } else {
- data
- };
+ let downloader = RemoteLogDownloader::new_with_fetcher(
+ fake_fetcher,
+ 2, // Max prefetch = 2
+ 10, // High concurrent limit
+ )
+ .unwrap();
+
+ let bucket = create_table_bucket(1, 0);
- let size_in_bytes = data.len();
+ // Request 4 downloads
+ let segs: Vec<_> = (0..4)
+ .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000,
bucket.clone()))
+ .collect();
- let log_record_batch = LogRecordsBatches::new(data);
+ let mut futures: Vec<_> = segs
+ .iter()
+ .map(|seg| downloader.request_remote_log("dir", seg))
+ .collect();
- // Create DefaultCompletedFetch from the data
- let completed_fetch = DefaultCompletedFetch::new(
- self.segment.table_bucket,
- log_record_batch,
- size_in_bytes,
- self.read_context,
- self.fetch_offset,
- self.high_watermark,
+ // Wait for first 2 to complete
+ let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
+ loop {
+ if futures.iter().filter(|f| f.is_done()).count() >= 2 {
+ break;
+ }
+ if tokio::time::Instant::now() > deadline {
+ panic!("Timeout waiting for first 2 downloads");
+ }
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ }
+
+ // Verify 3rd and 4th are blocked (prefetch limit)
+ tokio::time::sleep(Duration::from_millis(50)).await;
+ assert_eq!(
+ futures.iter().filter(|f| f.is_done()).count(),
+ 2,
+ "Prefetch limit: only 2 should complete"
);
- Ok(Box::new(completed_fetch))
+ // Drop first 2 (releases permits)
+ let f4 = futures.pop().unwrap();
+ let f3 = futures.pop().unwrap();
+ drop(futures);
+
+ // 3rd and 4th should now complete
+ let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
+ loop {
+ if f3.is_done() && f4.is_done() {
+ break;
+ }
+ if tokio::time::Instant::now() > deadline {
+ panic!("Timeout after permit release");
+ }
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ }
+ }
+
+ #[tokio::test]
+ async fn test_retry_and_cancellation() {
+ // Test retry with exponential backoff
+ let fake_fetcher = Arc::new(FakeFetcher::new(2, true)); // Fail twice,
succeed third time
+
+ let downloader =
+ RemoteLogDownloader::new_with_fetcher(fake_fetcher.clone(), 10,
1).unwrap();
+
+ let bucket = create_table_bucket(1, 0);
+ let seg = create_segment("seg1", 0, 1000, bucket);
+
+ let future = downloader.request_remote_log("dir", &seg);
+
+ // Should succeed after retries
+ let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
+ loop {
+ if future.is_done() {
+ break;
+ }
+ if tokio::time::Instant::now() > deadline {
+ panic!("Timeout waiting for retry to succeed");
+ }
+ tokio::time::sleep(Duration::from_millis(50)).await;
+ }
+
+ assert!(future.is_done(), "Should succeed after retries");
+
+ // Test cancellation
+ let seg2 = create_segment("seg2", 100, 1000, create_table_bucket(1,
0));
+ let fake_fetcher2 = Arc::new(FakeFetcher::new(100, true)); // Fail
forever
+ let downloader2 =
+ RemoteLogDownloader::new_with_fetcher(fake_fetcher2.clone(), 10,
1).unwrap();
+
+ let future2 = downloader2.request_remote_log("dir", &seg2);
+ tokio::time::sleep(Duration::from_millis(50)).await;
+
+ // Drop to cancel
+ drop(future2);
+ tokio::time::sleep(Duration::from_millis(50)).await;
+
+ assert_eq!(
+ fake_fetcher2.in_flight(),
+ 0,
+ "Cancellation should release resources"
+ );
}
}
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index afa44f3..8712650 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -19,10 +19,12 @@ use arrow::array::RecordBatch;
use arrow_schema::SchemaRef;
use log::{debug, warn};
use parking_lot::{Mutex, RwLock};
-use std::collections::{HashMap, HashSet};
-use std::slice::from_ref;
-use std::sync::Arc;
-use std::time::Duration;
+use std::{
+ collections::{HashMap, HashSet},
+ slice::from_ref,
+ sync::Arc,
+ time::{Duration, Instant},
+};
use tempfile::TempDir;
use crate::client::connection::FlussConnection;
@@ -30,11 +32,9 @@ use crate::client::credentials::CredentialsCache;
use crate::client::metadata::Metadata;
use crate::client::table::log_fetch_buffer::{
CompletedFetch, DefaultCompletedFetch, FetchErrorAction,
FetchErrorContext, FetchErrorLogLevel,
- LogFetchBuffer,
-};
-use crate::client::table::remote_log::{
- RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
+ LogFetchBuffer, RemotePendingFetch,
};
+use crate::client::table::remote_log::{RemoteLogDownloader,
RemoteLogFetchInfo};
use crate::error::{ApiError, Error, FlussError, Result};
use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath};
use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
@@ -223,6 +223,7 @@ impl<'a> TableScan<'a> {
&self.table_info,
self.metadata.clone(),
self.conn.get_connections(),
+ self.conn.config(),
self.projected_fields,
)?;
Ok(LogScanner {
@@ -235,6 +236,7 @@ impl<'a> TableScan<'a> {
&self.table_info,
self.metadata.clone(),
self.conn.get_connections(),
+ self.conn.config(),
self.projected_fields,
)?;
Ok(RecordBatchLogScanner {
@@ -273,6 +275,7 @@ impl LogScannerInner {
table_info: &TableInfo,
metadata: Arc<Metadata>,
connections: Arc<RpcClient>,
+ config: &crate::config::Config,
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
let log_scanner_status = Arc::new(LogScannerStatus::new());
@@ -286,13 +289,14 @@ impl LogScannerInner {
connections.clone(),
metadata.clone(),
log_scanner_status.clone(),
+ config,
projected_fields,
)?,
})
}
async fn poll_records(&self, timeout: Duration) -> Result<ScanRecords> {
- let start = std::time::Instant::now();
+ let start = Instant::now();
let deadline = start + timeout;
loop {
@@ -307,7 +311,7 @@ impl LogScannerInner {
}
// No data available, check if we should wait
- let now = std::time::Instant::now();
+ let now = Instant::now();
if now >= deadline {
// Timeout reached, return empty result
return Ok(ScanRecords::new(HashMap::new()));
@@ -376,7 +380,7 @@ impl LogScannerInner {
}
async fn poll_batches(&self, timeout: Duration) ->
Result<Vec<RecordBatch>> {
- let start = std::time::Instant::now();
+ let start = Instant::now();
let deadline = start + timeout;
loop {
@@ -387,7 +391,7 @@ impl LogScannerInner {
return Ok(batches);
}
- let now = std::time::Instant::now();
+ let now = Instant::now();
if now >= deadline {
return Ok(Vec::new());
}
@@ -478,6 +482,7 @@ impl LogFetcher {
conns: Arc<RpcClient>,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
+ config: &crate::config::Config,
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
let full_arrow_schema = to_arrow_schema(table_info.get_row_type())?;
@@ -497,7 +502,11 @@ impl LogFetcher {
log_scanner_status,
read_context,
remote_read_context,
- remote_log_downloader:
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+ remote_log_downloader: Arc::new(RemoteLogDownloader::new(
+ tmp_dir,
+ config.scanner_remote_log_prefetch_num,
+ config.scanner_remote_log_download_threads,
+ )?),
credentials_cache: Arc::new(CredentialsCache::new(conns.clone(),
metadata.clone())),
log_fetch_buffer,
nodes_with_pending_fetch_requests:
Arc::new(Mutex::new(HashSet::new())),
@@ -1510,6 +1519,7 @@ mod tests {
Arc::new(RpcClient::new()),
metadata,
status.clone(),
+ &crate::config::Config::default(),
None,
)?;
@@ -1529,8 +1539,8 @@ mod tests {
Ok(())
}
- #[test]
- fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()> {
+ #[tokio::test]
+ async fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()>
{
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let table_info = build_table_info(table_path.clone(), 1, 1);
let cluster = build_cluster_arc(&table_path, 1, 1);
@@ -1541,6 +1551,7 @@ mod tests {
Arc::new(RpcClient::new()),
metadata,
status,
+ &crate::config::Config::default(),
None,
)?;
@@ -1576,6 +1587,7 @@ mod tests {
Arc::new(RpcClient::new()),
metadata,
status,
+ &crate::config::Config::default(),
None,
)?;
@@ -1599,6 +1611,7 @@ mod tests {
Arc::new(RpcClient::new()),
metadata.clone(),
status.clone(),
+ &crate::config::Config::default(),
None,
)?;
@@ -1649,6 +1662,7 @@ mod tests {
Arc::new(RpcClient::new()),
metadata.clone(),
status.clone(),
+ &crate::config::Config::default(),
None,
)?;
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 92f600e..705e241 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -36,6 +36,16 @@ pub struct Config {
#[arg(long, default_value_t = 2 * 1024 * 1024)]
pub writer_batch_size: i32,
+
+ /// Maximum number of remote log segments to prefetch
+ /// Default: 4 (matching Java CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM)
+ #[arg(long, default_value_t = 4)]
+ pub scanner_remote_log_prefetch_num: usize,
+
+ /// Maximum concurrent remote log downloads
+ /// Default: 3 (matching Java REMOTE_FILE_DOWNLOAD_THREAD_NUM)
+ #[arg(long, default_value_t = 3)]
+ pub scanner_remote_log_download_threads: usize,
}
impl Default for Config {
@@ -46,6 +56,8 @@ impl Default for Config {
writer_acks: String::from("all"),
writer_retries: i32::MAX,
writer_batch_size: 2 * 1024 * 1024,
+ scanner_remote_log_prefetch_num: 4,
+ scanner_remote_log_download_threads: 3,
}
}
}
diff --git a/crates/fluss/src/proto/fluss_api.proto
b/crates/fluss/src/proto/fluss_api.proto
index eaee94c..65eddce 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -306,6 +306,7 @@ message PbRemoteLogSegment {
required int64 remote_log_start_offset = 2;
required int64 remote_log_end_offset = 3;
required int32 segment_size_in_bytes = 4;
+ optional int64 max_timestamp = 5;
}
message PbListOffsetsRespForBucket {
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 39114d3..4bfdc71 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -46,7 +46,10 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::Bytes;
use crc32c::crc32c;
use std::{
- io::{Cursor, Write},
+ collections::HashMap,
+ fs::File,
+ io::{Cursor, Read, Seek, SeekFrom, Write},
+ path::PathBuf,
sync::Arc,
};
@@ -82,6 +85,11 @@ pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET;
pub const ARROW_CHANGETYPE_OFFSET: usize = RECORD_BATCH_HEADER_SIZE;
pub const LOG_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH;
+/// Maximum batch size matches Java's Integer.MAX_VALUE limit.
+/// Java uses int type for batch size, so max value is 2^31 - 1 =
2,147,483,647 bytes (~2GB).
+/// This is the implicit limit in FileLogRecords.java and other Java
components.
+pub const MAX_BATCH_SIZE: usize = i32::MAX as usize; // 2,147,483,647 bytes
(~2GB)
+
/// const for record
/// The "magic" values.
#[derive(Debug, Clone, Copy)]
@@ -89,6 +97,49 @@ pub enum LogMagicValue {
V0 = 0,
}
+/// Safely convert batch size from i32 to usize with validation.
+///
+/// Validates that:
+/// - batch_size_bytes is non-negative
+/// - batch_size_bytes + LOG_OVERHEAD doesn't overflow
+/// - Result is within reasonable bounds
+fn validate_batch_size(batch_size_bytes: i32) -> Result<usize> {
+ // Check for negative size (corrupted data)
+ if batch_size_bytes < 0 {
+ return Err(Error::UnexpectedError {
+ message: format!("Invalid negative batch size: {}",
batch_size_bytes),
+ source: None,
+ });
+ }
+
+ let batch_size_u = batch_size_bytes as usize;
+
+ // Check for overflow when adding LOG_OVERHEAD
+ let total_size =
+ batch_size_u
+ .checked_add(LOG_OVERHEAD)
+ .ok_or_else(|| Error::UnexpectedError {
+ message: format!(
+ "Batch size {} + LOG_OVERHEAD {} would overflow",
+ batch_size_u, LOG_OVERHEAD
+ ),
+ source: None,
+ })?;
+
+ // Sanity check: reject unreasonably large batches
+ if total_size > MAX_BATCH_SIZE {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Batch size {} exceeds maximum allowed size {}",
+ total_size, MAX_BATCH_SIZE
+ ),
+ source: None,
+ });
+ }
+
+ Ok(total_size)
+}
+
// NOTE: Rust layout/offsets currently match Java only for V0.
// TODO: Add V1 layout/offsets to keep parity with Java's V1 format.
pub const CURRENT_LOG_MAGIC_VALUE: u8 = LogMagicValue::V0 as u8;
@@ -441,53 +492,280 @@ pub trait ToArrow {
fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
}
-pub struct LogRecordsBatches {
+/// In-memory log record source.
+/// Used for local tablet server fetches (existing path).
+struct MemorySource {
data: Bytes,
+}
+
+impl MemorySource {
+ fn new(data: Vec<u8>) -> Self {
+ Self {
+ data: Bytes::from(data),
+ }
+ }
+
+ fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
+ if pos + LOG_OVERHEAD > self.data.len() {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Position {} + LOG_OVERHEAD {} exceeds data size {}",
+ pos,
+ LOG_OVERHEAD,
+ self.data.len()
+ ),
+ source: None,
+ });
+ }
+
+ let base_offset = LittleEndian::read_i64(&self.data[pos +
BASE_OFFSET_OFFSET..]);
+ let batch_size_bytes = LittleEndian::read_i32(&self.data[pos +
LENGTH_OFFSET..]);
+
+ // Validate batch size to prevent integer overflow and corruption
+ let batch_size = validate_batch_size(batch_size_bytes)?;
+
+ Ok((base_offset, batch_size))
+ }
+
+ fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
+ if pos + size > self.data.len() {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Read beyond data size: {} + {} > {}",
+ pos,
+ size,
+ self.data.len()
+ ),
+ source: None,
+ });
+ }
+ // Zero-copy slice (Bytes is Arc-based)
+ Ok(self.data.slice(pos..pos + size))
+ }
+
+ fn total_size(&self) -> usize {
+ self.data.len()
+ }
+}
+
+/// RAII guard that deletes a file when dropped.
+/// Used to ensure file deletion happens AFTER the file handle is closed.
+struct FileCleanupGuard {
+ file_path: PathBuf,
+}
+
+impl Drop for FileCleanupGuard {
+ fn drop(&mut self) {
+ // File handle is already closed (this guard drops after the file
field)
+ if let Err(e) = std::fs::remove_file(&self.file_path) {
+ log::warn!(
+ "Failed to delete remote log file {}: {}",
+ self.file_path.display(),
+ e
+ );
+ } else {
+ log::debug!("Deleted remote log file: {}",
self.file_path.display());
+ }
+ }
+}
+
+/// File-backed log record source.
+/// Used for remote log segments downloaded to local disk.
+/// Streams data on-demand instead of loading entire file into memory.
+///
+/// Uses seek + read_exact for cross-platform compatibility.
+/// Access pattern is sequential iteration (single consumer).
+struct FileSource {
+ file: File,
+ file_size: usize,
+ base_offset: usize,
+ _cleanup: Option<FileCleanupGuard>, // Drops AFTER file (field order
matters!)
+}
+
+impl FileSource {
+ /// Create a new FileSource.
+ ///
+ /// The file at `file_path` will be deleted when this FileSource is
dropped.
+ fn new(file: File, base_offset: usize, file_path: PathBuf) -> Result<Self>
{
+ let file_size = file.metadata()?.len() as usize;
+
+ // Validate base_offset to prevent underflow in total_size()
+ if base_offset > file_size {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "base_offset ({}) exceeds file_size ({})",
+ base_offset, file_size
+ ),
+ source: None,
+ });
+ }
+
+ Ok(Self {
+ file,
+ file_size,
+ base_offset,
+ _cleanup: Some(FileCleanupGuard { file_path }),
+ })
+ }
+
+ /// Read data at a specific position using seek + read_exact.
+ /// This is cross-platform and adequate for sequential access patterns.
+ fn read_at(&mut self, pos: u64, buf: &mut [u8]) -> Result<()> {
+ self.file.seek(SeekFrom::Start(pos))?;
+ self.file.read_exact(buf)?;
+ Ok(())
+ }
+
+ fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
+ let actual_pos = self.base_offset + pos;
+ if actual_pos + LOG_OVERHEAD > self.file_size {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Position {} exceeds file size {}",
+ actual_pos, self.file_size
+ ),
+ source: None,
+ });
+ }
+
+ // Read only the header to extract base_offset and batch_size
+ let mut header_buf = vec![0u8; LOG_OVERHEAD];
+ self.read_at(actual_pos as u64, &mut header_buf)?;
+
+ let base_offset =
LittleEndian::read_i64(&header_buf[BASE_OFFSET_OFFSET..]);
+ let batch_size_bytes =
LittleEndian::read_i32(&header_buf[LENGTH_OFFSET..]);
+
+ // Validate batch size to prevent integer overflow and corruption
+ let batch_size = validate_batch_size(batch_size_bytes)?;
+
+ Ok((base_offset, batch_size))
+ }
+
+ fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
+ let actual_pos = self.base_offset + pos;
+ if actual_pos + size > self.file_size {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Read beyond file size: {} + {} > {}",
+ actual_pos, size, self.file_size
+ ),
+ source: None,
+ });
+ }
+
+ // Read the full batch data
+ let mut batch_buf = vec![0u8; size];
+ self.read_at(actual_pos as u64, &mut batch_buf)?;
+
+ Ok(Bytes::from(batch_buf))
+ }
+
+ fn total_size(&self) -> usize {
+ self.file_size - self.base_offset
+ }
+}
+
+/// Enum for different log record sources.
+enum LogRecordsSource {
+ Memory(MemorySource),
+ File(FileSource),
+}
+
+impl LogRecordsSource {
+ fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
+ match self {
+ Self::Memory(s) => s.read_batch_header(pos),
+ Self::File(s) => s.read_batch_header(pos),
+ }
+ }
+
+ fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
+ match self {
+ Self::Memory(s) => s.read_batch_data(pos, size),
+ Self::File(s) => s.read_batch_data(pos, size),
+ }
+ }
+
+ fn total_size(&self) -> usize {
+ match self {
+ Self::Memory(s) => s.total_size(),
+ Self::File(s) => s.total_size(),
+ }
+ }
+}
+
+pub struct LogRecordsBatches {
+ source: LogRecordsSource,
current_pos: usize,
remaining_bytes: usize,
}
impl LogRecordsBatches {
+ /// Create from in-memory Vec (existing path - backward compatible).
pub fn new(data: Vec<u8>) -> Self {
- let remaining_bytes: usize = data.len();
+ let source = LogRecordsSource::Memory(MemorySource::new(data));
+ let remaining_bytes = source.total_size();
Self {
- data: Bytes::from(data),
+ source,
current_pos: 0,
remaining_bytes,
}
}
- pub fn next_batch_size(&self) -> Option<usize> {
+ /// Create from file.
+ /// Enables streaming without loading entire file into memory.
+ ///
+ /// The file at `file_path` will be deleted when dropped.
+ /// This ensures the file is closed before deletion.
+ pub fn from_file(file: File, base_offset: usize, file_path: PathBuf) ->
Result<Self> {
+ let source = FileSource::new(file, base_offset, file_path)?;
+ let remaining_bytes = source.total_size();
+ Ok(Self {
+ source: LogRecordsSource::File(source),
+ current_pos: 0,
+ remaining_bytes,
+ })
+ }
+
+ /// Try to get the size of the next batch.
+ fn next_batch_size(&mut self) -> Result<Option<usize>> {
if self.remaining_bytes < LOG_OVERHEAD {
- return None;
+ return Ok(None);
}
- let batch_size_bytes =
- LittleEndian::read_i32(self.data.get(self.current_pos +
LENGTH_OFFSET..).unwrap());
- let batch_size = batch_size_bytes as usize + LOG_OVERHEAD;
- if batch_size > self.remaining_bytes {
- return None;
+ // Read only header to get size
+ match self.source.read_batch_header(self.current_pos) {
+ Ok((_base_offset, batch_size)) => {
+ if batch_size > self.remaining_bytes {
+ Ok(None)
+ } else {
+ Ok(Some(batch_size))
+ }
+ }
+ Err(e) => Err(e),
}
- Some(batch_size)
}
}
impl Iterator for LogRecordsBatches {
- type Item = LogRecordBatch;
+ type Item = Result<LogRecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
match self.next_batch_size() {
- Some(batch_size) => {
- let start = self.current_pos;
- let end = start + batch_size;
- // Since LogRecordsBatches owns the Vec<u8>, the slice is valid
- // as long as the mutable reference exists, which is 'a
- let record_batch =
LogRecordBatch::new(self.data.slice(start..end));
- self.current_pos += batch_size;
- self.remaining_bytes -= batch_size;
- Some(record_batch)
+ Ok(Some(batch_size)) => {
+ // Read full batch data on-demand
+ match self.source.read_batch_data(self.current_pos,
batch_size) {
+ Ok(data) => {
+ let record_batch = LogRecordBatch::new(data);
+ self.current_pos += batch_size;
+ self.remaining_bytes -= batch_size;
+ Some(Ok(record_batch))
+ }
+ Err(e) => Some(Err(e)),
+ }
}
- None => None,
+ Ok(None) => None,
+ Err(e) => Some(Err(e)),
}
}
}
@@ -1012,7 +1290,7 @@ impl ReadContext {
&body_buffer,
batch_metadata,
resolve_schema,
- &std::collections::HashMap::new(),
+ &HashMap::new(),
None,
&version,
)?;
@@ -1052,7 +1330,7 @@ impl ReadContext {
&body_buffer,
batch_metadata,
self.full_schema.clone(),
- &std::collections::HashMap::new(),
+ &HashMap::new(),
None,
&version,
)?;
@@ -1157,7 +1435,7 @@ pub struct MyVec<T>(pub StreamReader<T>);
#[cfg(test)]
mod tests {
use super::*;
- use crate::metadata::{DataField, DataTypes};
+ use crate::metadata::{DataField, DataTypes, RowType};
#[test]
fn test_to_array_type() {
@@ -1456,6 +1734,49 @@ mod tests {
Ok(())
}
+ // Tests for file-backed streaming
+
+ #[test]
+ fn test_file_source_streaming() -> Result<()> {
+ use tempfile::NamedTempFile;
+
+ // Test 1: Basic file reads work
+ let test_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
+ let mut tmp_file = NamedTempFile::new()?;
+ tmp_file.write_all(&test_data)?;
+ tmp_file.flush()?;
+
+ let file_path = tmp_file.path().to_path_buf();
+ let file = File::open(&file_path)?;
+ let mut source = FileSource::new(file, 0, file_path)?;
+
+ // Read full data
+ let data = source.read_batch_data(0, 10)?;
+ assert_eq!(data.to_vec(), test_data);
+
+ // Read partial data
+ let partial = source.read_batch_data(2, 5)?;
+ assert_eq!(partial.to_vec(), vec![3, 4, 5, 6, 7]);
+
+ // Test 2: base_offset works (critical for remote logs with
pos_in_log_segment)
+ let prefix = vec![0xFF; 100];
+ let actual_data = vec![1, 2, 3, 4, 5];
+ let mut tmp_file2 = NamedTempFile::new()?;
+ tmp_file2.write_all(&prefix)?;
+ tmp_file2.write_all(&actual_data)?;
+ tmp_file2.flush()?;
+
+ let file_path2 = tmp_file2.path().to_path_buf();
+ let file2 = File::open(&file_path2)?;
+ let mut source2 = FileSource::new(file2, 100, file_path2)?; // Skip
first 100 bytes
+
+ assert_eq!(source2.total_size(), 5); // Only counts data after offset
+ let data2 = source2.read_batch_data(0, 5)?;
+ assert_eq!(data2.to_vec(), actual_data);
+
+ Ok(())
+ }
+
#[test]
fn test_all_types_end_to_end() -> Result<()> {
use crate::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz,
TimestampNtz};
@@ -1590,4 +1911,63 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_log_records_batches_from_file() -> Result<()> {
+ use crate::client::WriteRecord;
+ use crate::compression::{
+ ArrowCompressionInfo, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+ };
+ use crate::metadata::TablePath;
+ use crate::row::GenericRow;
+ use tempfile::NamedTempFile;
+
+ // Integration test: Real log record batch streamed from file
+ let row_type = RowType::new(vec![
+ DataField::new("id".to_string(), DataTypes::int(), None),
+ DataField::new("name".to_string(), DataTypes::string(), None),
+ ]);
+ let table_path = Arc::new(TablePath::new("db".to_string(),
"tbl".to_string()));
+
+ let mut builder = MemoryLogRecordsArrowBuilder::new(
+ 1,
+ &row_type,
+ false,
+ ArrowCompressionInfo {
+ compression_type: ArrowCompressionType::None,
+ compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+ },
+ )?;
+
+ let mut row = GenericRow::new();
+ row.set_field(0, 1_i32);
+ row.set_field(1, "alice");
+ let record = WriteRecord::for_append(table_path.clone(), 1, row);
+ builder.append(&record)?;
+
+ let mut row2 = GenericRow::new();
+ row2.set_field(0, 2_i32);
+ row2.set_field(1, "bob");
+ let record2 = WriteRecord::for_append(table_path, 2, row2);
+ builder.append(&record2)?;
+
+ let data = builder.build()?;
+
+ // Write to file
+ let mut tmp_file = NamedTempFile::new()?;
+ tmp_file.write_all(&data)?;
+ tmp_file.flush()?;
+
+ // Create file-backed LogRecordsBatches (should stream, not load all
into memory)
+ let file_path = tmp_file.path().to_path_buf();
+ let file = File::open(&file_path)?;
+ let mut batches = LogRecordsBatches::from_file(file, 0, file_path)?;
+
+ // Iterate through batches (should work just like in-memory)
+ let batch = batches.next().expect("Should have at least one batch")?;
+ assert!(batch.size_in_bytes() > 0);
+ assert_eq!(batch.record_count(), 2);
+
+ Ok(())
+ }
}
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index 30424e5..156ef04 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -22,7 +22,6 @@ use crate::metadata::TableBucket;
use linked_hash_map::LinkedHashMap;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
-use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
@@ -33,11 +32,9 @@ pub fn current_time_ms() -> i64 {
.as_millis() as i64
}
-pub async fn delete_file(file_path: PathBuf) {
- tokio::fs::remove_file(&file_path)
- .await
- .unwrap_or_else(|err| log::warn!("Could not delete file:
{file_path:?}, error: {err:?}"));
-}
+// Removed: delete_file() is no longer used.
+// File cleanup is now handled via RAII with FileCleanupGuard in arrow.rs
+// which uses Rust's drop order to ensure files are closed before deletion.
pub struct FairBucketStatusMap<S> {
map: LinkedHashMap<TableBucket, Arc<S>>,