This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ee9bfe  feat: introduce priority queue for downloading remote 
segments (#187)
7ee9bfe is described below

commit 7ee9bfe5b9fbef568b40553354cc81efd7071214
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Jan 25 01:09:12 2026 +0000

    feat: introduce priority queue for downloading remote segments (#187)
---
 crates/fluss/src/client/connection.rs             |    4 +
 crates/fluss/src/client/table/log_fetch_buffer.rs |  195 +++-
 crates/fluss/src/client/table/mod.rs              |    3 +
 crates/fluss/src/client/table/remote_log.rs       | 1087 ++++++++++++++++++---
 crates/fluss/src/client/table/scanner.rs          |   44 +-
 crates/fluss/src/config.rs                        |   12 +
 crates/fluss/src/proto/fluss_api.proto            |    1 +
 crates/fluss/src/record/arrow.rs                  |  432 +++++++-
 crates/fluss/src/util/mod.rs                      |    9 +-
 9 files changed, 1611 insertions(+), 176 deletions(-)

diff --git a/crates/fluss/src/client/connection.rs 
b/crates/fluss/src/client/connection.rs
index 595daf5..0e41bbe 100644
--- a/crates/fluss/src/client/connection.rs
+++ b/crates/fluss/src/client/connection.rs
@@ -59,6 +59,10 @@ impl FlussConnection {
         self.network_connects.clone()
     }
 
+    pub fn config(&self) -> &Config {
+        &self.args
+    }
+
     pub async fn get_admin(&self) -> Result<FlussAdmin> {
         FlussAdmin::new(self.network_connects.clone(), 
self.metadata.clone()).await
     }
diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs 
b/crates/fluss/src/client/table/log_fetch_buffer.rs
index 214a79c..4a64eda 100644
--- a/crates/fluss/src/client/table/log_fetch_buffer.rs
+++ b/crates/fluss/src/client/table/log_fetch_buffer.rs
@@ -18,15 +18,22 @@
 use arrow::array::RecordBatch;
 use parking_lot::Mutex;
 
+use crate::client::table::remote_log::{
+    PrefetchPermit, RemoteLogDownloadFuture, RemoteLogFile, RemoteLogSegment,
+};
 use crate::error::{ApiError, Error, Result};
 use crate::metadata::TableBucket;
 use crate::record::{
     LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, 
ScanRecord,
 };
-use std::collections::{HashMap, VecDeque};
-use std::sync::Arc;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::time::Duration;
+use std::{
+    collections::{HashMap, VecDeque},
+    sync::{
+        Arc,
+        atomic::{AtomicBool, Ordering},
+    },
+    time::{Duration, Instant},
+};
 use tokio::sync::Notify;
 
 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -106,7 +113,7 @@ impl LogFetchBuffer {
     /// Wait for the buffer to become non-empty, with timeout.
     /// Returns true if data became available, false if timeout.
     pub async fn await_not_empty(&self, timeout: Duration) -> Result<bool> {
-        let deadline = std::time::Instant::now() + timeout;
+        let deadline = Instant::now() + timeout;
 
         loop {
             // Check if buffer is not empty
@@ -122,7 +129,7 @@ impl LogFetchBuffer {
             }
 
             // Check if timeout
-            let now = std::time::Instant::now();
+            let now = Instant::now();
             if now >= deadline {
                 return Ok(false);
             }
@@ -325,6 +332,7 @@ impl PendingFetch for CompletedPendingFetch {
 }
 
 /// Default implementation of CompletedFetch for in-memory log records
+/// Used for local fetches from tablet server
 pub struct DefaultCompletedFetch {
     table_bucket: TableBucket,
     api_error: Option<ApiError>,
@@ -441,7 +449,8 @@ impl DefaultCompletedFetch {
                 if record.offset() >= self.next_fetch_offset {
                     return Ok(Some(record));
                 }
-            } else if let Some(batch) = self.log_record_batch.next() {
+            } else if let Some(batch_result) = self.log_record_batch.next() {
+                let batch = batch_result?;
                 self.current_record_iterator = 
Some(batch.records(&self.read_context)?);
                 self.current_record_batch = Some(batch);
             } else {
@@ -470,11 +479,12 @@ impl DefaultCompletedFetch {
     /// Get the next batch directly without row iteration
     fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
         loop {
-            let Some(log_batch) = self.log_record_batch.next() else {
+            let Some(log_batch_result) = self.log_record_batch.next() else {
                 self.drain();
                 return Ok(None);
             };
 
+            let log_batch = log_batch_result?;
             let mut record_batch = log_batch.record_batch(&self.read_context)?;
 
             // Skip empty batches
@@ -644,6 +654,174 @@ impl CompletedFetch for DefaultCompletedFetch {
     }
 }
 
+/// Completed fetch for remote log segments
+/// Matches Java's RemoteCompletedFetch design - separate class for remote vs 
local
+/// Holds RAII permit until consumed (data is in inner)
+pub struct RemoteCompletedFetch {
+    inner: DefaultCompletedFetch,
+    permit: Option<PrefetchPermit>,
+}
+
+impl RemoteCompletedFetch {
+    pub fn new(inner: DefaultCompletedFetch, permit: PrefetchPermit) -> Self {
+        Self {
+            inner,
+            permit: Some(permit),
+        }
+    }
+}
+
+impl CompletedFetch for RemoteCompletedFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        self.inner.table_bucket()
+    }
+
+    fn api_error(&self) -> Option<&ApiError> {
+        self.inner.api_error()
+    }
+
+    fn fetch_error_context(&self) -> Option<&FetchErrorContext> {
+        self.inner.fetch_error_context()
+    }
+
+    fn take_error(&mut self) -> Option<Error> {
+        self.inner.take_error()
+    }
+
+    fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> 
{
+        self.inner.fetch_records(max_records)
+    }
+
+    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>> {
+        self.inner.fetch_batches(max_batches)
+    }
+
+    fn is_consumed(&self) -> bool {
+        self.inner.is_consumed()
+    }
+
+    fn records_read(&self) -> usize {
+        self.inner.records_read()
+    }
+
+    fn drain(&mut self) {
+        self.inner.drain();
+        // Release permit immediately (don't wait for struct drop)
+        // Critical: allows prefetch to continue even if Box<dyn 
CompletedFetch> kept around
+        self.permit.take(); // drops permit here, triggers recycle notification
+    }
+
+    fn size_in_bytes(&self) -> usize {
+        self.inner.size_in_bytes()
+    }
+
+    fn high_watermark(&self) -> i64 {
+        self.inner.high_watermark()
+    }
+
+    fn is_initialized(&self) -> bool {
+        self.inner.is_initialized()
+    }
+
+    fn set_initialized(&mut self) {
+        self.inner.set_initialized()
+    }
+
+    fn next_fetch_offset(&self) -> i64 {
+        self.inner.next_fetch_offset()
+    }
+}
+// Permit released explicitly in drain() or automatically when struct drops
+
+/// Pending fetch that waits for remote log file to be downloaded
+pub struct RemotePendingFetch {
+    segment: RemoteLogSegment,
+    download_future: RemoteLogDownloadFuture,
+    pos_in_log_segment: i32,
+    fetch_offset: i64,
+    high_watermark: i64,
+    read_context: ReadContext,
+}
+
+impl RemotePendingFetch {
+    pub fn new(
+        segment: RemoteLogSegment,
+        download_future: RemoteLogDownloadFuture,
+        pos_in_log_segment: i32,
+        fetch_offset: i64,
+        high_watermark: i64,
+        read_context: ReadContext,
+    ) -> Self {
+        Self {
+            segment,
+            download_future,
+            pos_in_log_segment,
+            fetch_offset,
+            high_watermark,
+            read_context,
+        }
+    }
+}
+
+impl PendingFetch for RemotePendingFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        &self.segment.table_bucket
+    }
+
+    fn is_completed(&self) -> bool {
+        self.download_future.is_done()
+    }
+
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+        // Take the RemoteLogFile and destructure
+        let remote_log_file = self.download_future.take_remote_log_file()?;
+        let RemoteLogFile {
+            file_path,
+            file_size: _,
+            permit,
+        } = remote_log_file;
+
+        // Open file for streaming (no memory allocation for entire file)
+        let file = std::fs::File::open(&file_path)?;
+        let file_size = file.metadata()?.len() as usize;
+
+        // Create file-backed LogRecordsBatches with cleanup (streaming!)
+        // Data will be read batch-by-batch on-demand, not all at once
+        // FileSource will delete the file when dropped (after file is closed)
+        let log_record_batch =
+            LogRecordsBatches::from_file(file, self.pos_in_log_segment as 
usize, file_path)?;
+
+        // Calculate size based on position offset
+        let size_in_bytes = if self.pos_in_log_segment > 0 {
+            let pos = self.pos_in_log_segment as usize;
+            if pos >= file_size {
+                return Err(Error::UnexpectedError {
+                    message: format!("Position {} exceeds file size {}", pos, 
file_size),
+                    source: None,
+                });
+            }
+            file_size - pos
+        } else {
+            file_size
+        };
+
+        // Create DefaultCompletedFetch
+        let inner_fetch = DefaultCompletedFetch::new(
+            self.segment.table_bucket.clone(),
+            log_record_batch,
+            size_in_bytes,
+            self.read_context,
+            self.fetch_offset,
+            self.high_watermark,
+        );
+
+        // Wrap it with RemoteCompletedFetch to hold the permit
+        // Permit manages the prefetch slot (releases semaphore and notifies 
coordinator) when dropped;
+        // file deletion is handled by FileCleanupGuard in the file-backed 
source created via from_file
+        Ok(Box::new(RemoteCompletedFetch::new(inner_fetch, permit)))
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -655,7 +833,6 @@ mod tests {
     use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, 
to_arrow_schema};
     use crate::row::GenericRow;
     use std::sync::Arc;
-    use std::time::Duration;
 
     fn test_read_context() -> Result<ReadContext> {
         let row_type = RowType::new(vec![DataField::new(
diff --git a/crates/fluss/src/client/table/mod.rs 
b/crates/fluss/src/client/table/mod.rs
index 2bfa054..2dc56d5 100644
--- a/crates/fluss/src/client/table/mod.rs
+++ b/crates/fluss/src/client/table/mod.rs
@@ -36,6 +36,9 @@ mod writer;
 use crate::client::table::upsert::TableUpsert;
 pub use append::{AppendWriter, TableAppend};
 pub use lookup::{LookupResult, Lookuper, TableLookup};
+pub use remote_log::{
+    DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS, 
DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM,
+};
 pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
 pub use writer::{TableWriter, UpsertWriter};
 
diff --git a/crates/fluss/src/client/table/remote_log.rs 
b/crates/fluss/src/client/table/remote_log.rs
index 0142515..c39056d 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -14,21 +14,84 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use crate::client::table::log_fetch_buffer::{CompletedFetch, 
DefaultCompletedFetch, PendingFetch};
 use crate::error::{Error, Result};
 use crate::io::{FileIO, Storage};
 use crate::metadata::TableBucket;
 use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
-use crate::record::{LogRecordsBatches, ReadContext};
-use crate::util::delete_file;
 use parking_lot::{Mutex, RwLock};
-use std::collections::HashMap;
-use std::io;
-use std::path::{Path, PathBuf};
-use std::sync::Arc;
+use std::{
+    cmp::{Ordering, Reverse, min},
+    collections::{BinaryHeap, HashMap},
+    future::Future,
+    io, mem,
+    path::{Path, PathBuf},
+    pin::Pin,
+    sync::Arc,
+    time::Duration,
+};
+
+#[cfg(test)]
+use std::{
+    env,
+    time::{SystemTime, UNIX_EPOCH},
+};
 use tempfile::TempDir;
 use tokio::io::AsyncWriteExt;
-use tokio::sync::oneshot;
+use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
+use tokio::task::JoinSet;
+
+/// Default maximum number of remote log segments to prefetch
+/// Matches Java's CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM (default: 4)
+pub const DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM: usize = 4;
+
+/// Default maximum concurrent remote log downloads
+/// Matches Java's REMOTE_FILE_DOWNLOAD_THREAD_NUM (default: 3)
+pub const DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS: usize = 3;
+
+/// Initial retry backoff delay (milliseconds)
+/// Prevents hot-spin retry loops on persistent failures
+const RETRY_BACKOFF_BASE_MS: u64 = 100;
+
+/// Maximum retry backoff delay (milliseconds)
+/// Caps exponential backoff to avoid excessive delays
+const RETRY_BACKOFF_MAX_MS: u64 = 5_000;
+
+/// Maximum number of retries before giving up
+/// After this many retries, the download will fail permanently
+const MAX_RETRY_COUNT: u32 = 10;
+
+/// Calculate exponential backoff delay with jitter for retries
+fn calculate_backoff_delay(retry_count: u32) -> tokio::time::Duration {
+    use rand::Rng;
+
+    // Exponential backoff: base * 2^retry_count
+    let exponential_ms = RETRY_BACKOFF_BASE_MS.saturating_mul(1 << 
retry_count.min(10)); // Cap exponent to prevent overflow
+
+    // Cap at maximum
+    let capped_ms = exponential_ms.min(RETRY_BACKOFF_MAX_MS);
+
+    // Add jitter (±25% randomness) to avoid thundering herd
+    let mut rng = rand::rng();
+    let jitter = rng.random_range(0.75..=1.25);
+    let final_ms = ((capped_ms as f64) * jitter) as u64;
+
+    tokio::time::Duration::from_millis(final_ms)
+}
+
+/// Result of a fetch operation containing file path and size
+#[derive(Debug)]
+pub struct FetchResult {
+    pub file_path: PathBuf,
+    pub file_size: usize,
+}
+
+/// Trait for fetching remote log segments (allows dependency injection for 
testing)
+pub trait RemoteLogFetcher: Send + Sync {
+    fn fetch(
+        &self,
+        request: &RemoteLogDownloadRequest,
+    ) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>>;
+}
 
 /// Represents a remote log segment that needs to be downloaded
 #[derive(Debug, Clone)]
@@ -40,6 +103,7 @@ pub struct RemoteLogSegment {
     #[allow(dead_code)]
     pub size_in_bytes: i32,
     pub table_bucket: TableBucket,
+    pub max_timestamp: i64,
 }
 
 impl RemoteLogSegment {
@@ -50,6 +114,9 @@ impl RemoteLogSegment {
             end_offset: segment.remote_log_end_offset,
             size_in_bytes: segment.segment_size_in_bytes,
             table_bucket,
+            // Match Java's behavior: use -1 for missing timestamp
+            // (Java: CommonRpcMessageUtils.java:171-174)
+            max_timestamp: segment.max_timestamp.unwrap_or(-1),
         }
     }
 
@@ -88,17 +155,473 @@ impl RemoteLogFetchInfo {
     }
 }
 
+/// RAII guard for prefetch permit that notifies coordinator on drop
+///
+/// NOTE: File deletion is now handled by FileSource::drop(), not here.
+/// This ensures the file is closed before deletion
+#[derive(Debug)]
+pub struct PrefetchPermit {
+    permit: Option<OwnedSemaphorePermit>,
+    recycle_notify: Arc<Notify>,
+}
+
+impl PrefetchPermit {
+    fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc<Notify>) -> Self {
+        Self {
+            permit: Some(permit),
+            recycle_notify,
+        }
+    }
+}
+
+impl Drop for PrefetchPermit {
+    fn drop(&mut self) {
+        // Release capacity (critical: permit must be dropped before notify)
+        let _ = self.permit.take(); // drops permit here
+
+        // Then wake coordinator so it can acquire the now-available permit
+        self.recycle_notify.notify_one();
+    }
+}
+
+/// Downloaded remote log file with prefetch permit
+/// File remains on disk for memory efficiency; file deletion is handled by 
FileCleanupGuard in FileSource
+#[derive(Debug)]
+pub struct RemoteLogFile {
+    /// Path to the downloaded file on local disk
+    pub file_path: PathBuf,
+    /// Size of the file in bytes
+    /// Currently unused but kept for potential future use (logging, metrics, 
etc.)
+    #[allow(dead_code)]
+    pub file_size: usize,
+    /// RAII permit that releases prefetch semaphore slot and notifies 
coordinator when dropped
+    pub permit: PrefetchPermit,
+}
+
+/// Represents a request to download a remote log segment with priority 
ordering
+#[derive(Debug)]
+pub struct RemoteLogDownloadRequest {
+    segment: RemoteLogSegment,
+    remote_log_tablet_dir: String,
+    result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+    retry_count: u32,
+    next_retry_at: Option<tokio::time::Instant>,
+}
+
+impl RemoteLogDownloadRequest {
+    /// Get the segment (used by test fetcher implementations)
+    #[cfg(test)]
+    pub fn segment(&self) -> &RemoteLogSegment {
+        &self.segment
+    }
+}
+
+// Total ordering for priority queue (Rust requirement: cmp==Equal implies Eq)
+// Primary: Java semantics (timestamp cross-bucket, offset within-bucket)
+// Tie-breakers: table_bucket fields (table_id, partition_id, bucket_id), then 
segment_id
+impl Ord for RemoteLogDownloadRequest {
+    fn cmp(&self, other: &Self) -> Ordering {
+        if self.segment.table_bucket == other.segment.table_bucket {
+            // Same bucket: order by start_offset (ascending - earlier 
segments first)
+            self.segment
+                .start_offset
+                .cmp(&other.segment.start_offset)
+                .then_with(|| 
self.segment.segment_id.cmp(&other.segment.segment_id))
+        } else {
+            // Different buckets: order by max_timestamp (ascending - older 
segments first)
+            // Then by table_bucket fields for true total ordering
+            self.segment
+                .max_timestamp
+                .cmp(&other.segment.max_timestamp)
+                .then_with(|| {
+                    self.segment
+                        .table_bucket
+                        .table_id()
+                        .cmp(&other.segment.table_bucket.table_id())
+                })
+                .then_with(|| {
+                    self.segment
+                        .table_bucket
+                        .partition_id()
+                        .cmp(&other.segment.table_bucket.partition_id())
+                })
+                .then_with(|| {
+                    self.segment
+                        .table_bucket
+                        .bucket_id()
+                        .cmp(&other.segment.table_bucket.bucket_id())
+                })
+                .then_with(|| 
self.segment.segment_id.cmp(&other.segment.segment_id))
+        }
+    }
+}
+
+impl PartialOrd for RemoteLogDownloadRequest {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl PartialEq for RemoteLogDownloadRequest {
+    fn eq(&self, other: &Self) -> bool {
+        self.cmp(other) == Ordering::Equal
+    }
+}
+
+impl Eq for RemoteLogDownloadRequest {}
+
+/// Result of a download task
+enum DownloadResult {
+    /// Successful download - deliver result to future
+    Success {
+        result: RemoteLogFile,
+        result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+    },
+    /// Download failed - re-queue request for retry (Java pattern)
+    FailedRetry { request: RemoteLogDownloadRequest },
+    /// Download failed permanently after max retries - fail the future
+    FailedPermanently {
+        error: Error,
+        result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+    },
+    /// Cancelled - don't deliver, don't re-queue
+    Cancelled,
+}
+
+/// Production implementation of RemoteLogFetcher that downloads from actual 
storage
+struct ProductionFetcher {
+    remote_fs_props: Arc<RwLock<HashMap<String, String>>>,
+    local_log_dir: Arc<TempDir>,
+}
+
+impl RemoteLogFetcher for ProductionFetcher {
+    fn fetch(
+        &self,
+        request: &RemoteLogDownloadRequest,
+    ) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
+        let remote_fs_props = self.remote_fs_props.clone();
+        let local_log_dir = self.local_log_dir.clone();
+
+        // Clone data needed for async operation to avoid lifetime issues
+        let segment = request.segment.clone();
+        let remote_log_tablet_dir = request.remote_log_tablet_dir.to_string();
+
+        Box::pin(async move {
+            let local_file_name = segment.local_file_name();
+            let local_file_path = local_log_dir.path().join(&local_file_name);
+
+            // Build remote path
+            let offset_prefix = format!("{:020}", segment.start_offset);
+            let remote_path = format!(
+                "{}/{}/{}.log",
+                remote_log_tablet_dir, segment.segment_id, offset_prefix
+            );
+
+            let remote_fs_props_map = remote_fs_props.read().clone();
+
+            // Download file to disk (streaming, no memory spike)
+            let file_path = RemoteLogDownloader::download_file(
+                &remote_log_tablet_dir,
+                &remote_path,
+                &local_file_path,
+                &remote_fs_props_map,
+            )
+            .await?;
+
+            // Get file size
+            let metadata = tokio::fs::metadata(&file_path).await?;
+            let file_size = metadata.len() as usize;
+
+            // Return file path - file stays on disk until PrefetchPermit is 
dropped
+            Ok(FetchResult {
+                file_path,
+                file_size,
+            })
+        })
+    }
+}
+
+/// Coordinator that owns all download state and orchestrates downloads
+struct DownloadCoordinator {
+    download_queue: BinaryHeap<Reverse<RemoteLogDownloadRequest>>,
+    active_downloads: JoinSet<DownloadResult>,
+    in_flight: usize,
+    prefetch_semaphore: Arc<Semaphore>,
+    max_concurrent_downloads: usize,
+    recycle_notify: Arc<Notify>,
+    fetcher: Arc<dyn RemoteLogFetcher>,
+}
+
+impl DownloadCoordinator {
+    /// Check if we should wait for recycle notification
+    /// Only wait if we're blocked on permits AND have pending work
+    fn should_wait_for_recycle(&self) -> bool {
+        !self.download_queue.is_empty()
+            && self.in_flight < self.max_concurrent_downloads
+            && self.prefetch_semaphore.available_permits() == 0
+    }
+
+    /// Find the earliest retry deadline among pending requests
+    fn next_retry_deadline(&self) -> Option<tokio::time::Instant> {
+        self.download_queue
+            .iter()
+            .filter_map(|Reverse(req)| req.next_retry_at)
+            .min()
+    }
+}
+
+impl DownloadCoordinator {
+    /// Try to start as many downloads as possible (event-driven drain)
+    fn drain(&mut self) {
+        // Collect deferred requests (backoff not ready) to push back later
+        let mut deferred = Vec::new();
+        // Scan entire queue once to find ready requests (prevents 
head-of-line blocking)
+        // Bound to reasonable max to avoid excessive work if queue is huge
+        let max_scan = self.download_queue.len().min(100);
+        let mut scanned = 0;
+
+        while !self.download_queue.is_empty()
+            && self.in_flight < self.max_concurrent_downloads
+            && scanned < max_scan
+        {
+            // Try acquire prefetch permit (non-blocking)
+            let permit = match 
self.prefetch_semaphore.clone().try_acquire_owned() {
+                Ok(p) => p,
+                Err(_) => break, // No permits available
+            };
+
+            // Pop highest priority request
+            let Some(Reverse(request)) = self.download_queue.pop() else {
+                drop(permit);
+                break;
+            };
+
+            scanned += 1;
+
+            // Retry backoff check: defer if retry time hasn't arrived yet
+            if let Some(next_retry_at) = request.next_retry_at {
+                let now = tokio::time::Instant::now();
+                if next_retry_at > now {
+                    // Not ready for retry yet - defer and continue looking 
for ready requests
+                    drop(permit);
+                    deferred.push(request);
+                    continue; // Don't block - keep looking for ready requests
+                }
+            }
+
+            // Cancellation check: skip if sender closed
+            if request.result_sender.is_closed() {
+                drop(permit);
+                continue; // Try next request
+            }
+
+            // Clone data for the spawned task
+            let fetcher = self.fetcher.clone();
+            let recycle_notify = self.recycle_notify.clone();
+
+            // Spawn download task
+            self.active_downloads.spawn(async move {
+                spawn_download_task(request, permit, fetcher, 
recycle_notify).await
+            });
+            self.in_flight += 1;
+        }
+
+        // Push deferred requests back to queue (maintains priority order)
+        if !deferred.is_empty() {
+            for req in deferred {
+                self.download_queue.push(Reverse(req));
+            }
+        }
+    }
+}
+
+/// Spawn a download task that attempts download once
+/// Matches Java's RemoteLogDownloader.java
+///
+/// Benefits over infinite in-place retry:
+/// - Failed downloads don't block prefetch slots
+/// - Other segments can make progress while one is failing
+/// - Natural retry through coordinator re-picking from queue
+async fn spawn_download_task(
+    request: RemoteLogDownloadRequest,
+    permit: tokio::sync::OwnedSemaphorePermit,
+    fetcher: Arc<dyn RemoteLogFetcher>,
+    recycle_notify: Arc<Notify>,
+) -> DownloadResult {
+    // Check if receiver still alive (early cancellation check)
+    if request.result_sender.is_closed() {
+        drop(permit);
+        return DownloadResult::Cancelled;
+    }
+
+    // Try download ONCE
+    let download_result = fetcher.fetch(&request).await;
+
+    match download_result {
+        Ok(fetch_result) => {
+            // Success - permit will be released on drop (FileSource handles 
file deletion)
+            DownloadResult::Success {
+                result: RemoteLogFile {
+                    file_path: fetch_result.file_path,
+                    file_size: fetch_result.file_size,
+                    permit: PrefetchPermit::new(permit, 
recycle_notify.clone()),
+                },
+                result_sender: request.result_sender,
+            }
+        }
+        Err(e) if request.result_sender.is_closed() => {
+            // Receiver dropped (cancelled) - release permit, don't re-queue
+            drop(permit);
+            DownloadResult::Cancelled
+        }
+        Err(e) => {
+            // Download failed - check if we should retry or give up
+            let retry_count = request.retry_count + 1;
+
+            if retry_count > MAX_RETRY_COUNT {
+                // Too many retries - give up and fail the future
+                log::error!(
+                    "Failed to download remote log segment {} after {} 
retries: {}. Giving up.",
+                    request.segment.segment_id,
+                    retry_count,
+                    e
+                );
+                drop(permit); // Release immediately
+
+                DownloadResult::FailedPermanently {
+                    error: Error::UnexpectedError {
+                        message: format!(
+                            "Failed to download remote log segment after {} 
retries: {}",
+                            retry_count, e
+                        ),
+                        source: Some(Box::new(e)),
+                    },
+                    result_sender: request.result_sender,
+                }
+            } else {
+                // Retry with exponential backoff
+                let backoff_delay = calculate_backoff_delay(retry_count);
+                let next_retry_at = tokio::time::Instant::now() + 
backoff_delay;
+
+                log::warn!(
+                    "Failed to download remote log segment {}: {}. Retry {}/{} 
after {:?}",
+                    request.segment.segment_id,
+                    e,
+                    retry_count,
+                    MAX_RETRY_COUNT,
+                    backoff_delay
+                );
+                drop(permit); // Release immediately - critical!
+
+                // Update retry state
+                let mut retry_request = request;
+                retry_request.retry_count = retry_count;
+                retry_request.next_retry_at = Some(next_retry_at);
+
+                // Re-queue request to same priority queue
+                // Future stays with request, NOT completed - will complete on 
successful retry
+                DownloadResult::FailedRetry {
+                    request: retry_request,
+                }
+            }
+        }
+    }
+}
+
+/// Coordinator event loop - owns all download state and reacts to events
+async fn coordinator_loop(
+    mut coordinator: DownloadCoordinator,
+    mut request_receiver: mpsc::UnboundedReceiver<RemoteLogDownloadRequest>,
+) {
+    loop {
+        // Drain once at start of iteration to process ready work
+        coordinator.drain();
+
+        // Calculate sleep duration until next retry (if any deferred requests)
+        let next_retry_sleep = 
coordinator.next_retry_deadline().map(|deadline| {
+            let now = tokio::time::Instant::now();
+            if deadline > now {
+                deadline - now
+            } else {
+                tokio::time::Duration::from_millis(0) // Ready now
+            }
+        });
+
+        tokio::select! {
+            // Event 1: NewRequest
+            Some(request) = request_receiver.recv() => {
+                coordinator.download_queue.push(Reverse(request));
+                // Immediately try to start this download
+                continue;
+            }
+
+            // Event 2: DownloadFinished
+            Some(result) = coordinator.active_downloads.join_next() => {
+                coordinator.in_flight -= 1;
+
+                match result {
+                    Ok(DownloadResult::Success { result, result_sender }) => {
+                        // Success - deliver result to future
+                        if !result_sender.is_closed() {
+                            let _ = result_sender.send(Ok(result));
+                        }
+                        // Permit held in RemoteLogFile until consumed
+                    }
+                    Ok(DownloadResult::FailedRetry { request }) => {
+                        // Re-queue immediately (don't block coordinator with 
sleep)
+                        // The retry time will be checked in drain() before 
processing
+                        // (Java line 177: segmentsToFetch.add(request))
+                        // Permit already released (Java line 174)
+                        coordinator.download_queue.push(Reverse(request));
+                    }
+                    Ok(DownloadResult::FailedPermanently { error, 
result_sender }) => {
+                        // Permanent failure - deliver error to future
+                        if !result_sender.is_closed() {
+                            let _ = result_sender.send(Err(error));
+                        }
+                        // Permit already released
+                    }
+                    Ok(DownloadResult::Cancelled) => {
+                        // Cancelled - permit already released, nothing to do
+                    }
+                    Err(e) => {
+                        log::error!("Download task panicked: {:?}", e);
+                        // Permit already released via RAII
+                    }
+                }
+                // Immediately try to start another download
+                continue;
+            }
+
+            // Event 3: Recycled (only wait when blocked on permits with 
pending work)
+            _ = coordinator.recycle_notify.notified(),
+                if coordinator.should_wait_for_recycle() => {
+                // Wake up to try draining
+                continue;
+            }
+
+            // Event 4: Retry timer - wake up when next retry is ready
+            _ = 
tokio::time::sleep(next_retry_sleep.unwrap_or(tokio::time::Duration::from_secs(3600))),
+                if next_retry_sleep.is_some() => {
+                // Wake up to retry deferred requests
+                continue;
+            }
+
+            else => break,  // All channels closed AND no work pending
+        }
+    }
+}
+
 type CompletionCallback = Box<dyn Fn() + Send + Sync>;
 
 /// Future for a remote log download request
 pub struct RemoteLogDownloadFuture {
-    result: Arc<Mutex<Option<Result<Vec<u8>>>>>,
+    result: Arc<Mutex<Option<Result<RemoteLogFile>>>>,
     completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>>,
-    // todo: add recycleCallback
 }
 
 impl RemoteLogDownloadFuture {
-    pub fn new(receiver: oneshot::Receiver<Result<Vec<u8>>>) -> Self {
+    pub fn new(receiver: oneshot::Receiver<Result<RemoteLogFile>>) -> Self {
         let result = Arc::new(Mutex::new(None));
         let result_clone = Arc::clone(&result);
         let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> =
@@ -123,7 +646,7 @@ impl RemoteLogDownloadFuture {
             // This also ensures that any callbacks registered after this 
point will be called immediately
             let callbacks: Vec<CompletionCallback> = {
                 let mut callbacks_guard = callbacks_clone.lock();
-                std::mem::take(&mut *callbacks_guard)
+                mem::take(&mut *callbacks_guard)
             };
             for callback in callbacks {
                 callback();
@@ -172,40 +695,90 @@ impl RemoteLogDownloadFuture {
         self.result.lock().is_some()
     }
 
-    /// Get the downloaded file path (synchronous, only works after is_done() 
returns true)
-    pub fn get_remote_log_bytes(&self) -> Result<Vec<u8>> {
-        // todo: handle download fail
-        let guard = self.result.lock();
-        match guard.as_ref() {
-            Some(Ok(path)) => Ok(path.clone()),
-            Some(Err(e)) => Err(Error::IoUnexpectedError {
-                message: format!("Fail to get remote log bytes: {e}"),
-                source: io::Error::other(format!("{e:?}")),
-            }),
+    /// Take the RemoteLogFile (including the permit) from this future
+    /// This should only be called when the download is complete
+    /// This is the correct way to consume the download - it transfers permit 
ownership
+    pub fn take_remote_log_file(&self) -> Result<RemoteLogFile> {
+        let mut guard = self.result.lock();
+        match guard.take() {
+            Some(Ok(remote_log_file)) => Ok(remote_log_file),
+            Some(Err(e)) => {
+                let error_msg = format!("{e}");
+                Err(Error::IoUnexpectedError {
+                    message: format!("Fail to get remote log file: 
{error_msg}"),
+                    source: io::Error::other(error_msg),
+                })
+            }
             None => Err(Error::IoUnexpectedError {
-                message: "Get remote log bytes not completed yet".to_string(),
-                source: io::Error::other("Get remote log bytes not completed 
yet"),
+                message: "Remote log file already taken or not 
ready".to_string(),
+                source: io::Error::other("Remote log file already taken or not 
ready"),
             }),
         }
     }
 }
 
-/// Downloader for remote log segment files
+/// Downloader for remote log segment files.
+///
+/// # Shutdown behavior
+///
+/// When the downloader is dropped, the request channel closes, signaling the 
coordinator
+/// to stop accepting new work. The coordinator will finish any in-flight 
downloads but
+/// won't wait for completion. Pending futures will fail.
 pub struct RemoteLogDownloader {
-    local_log_dir: TempDir,
-    remote_fs_props: RwLock<HashMap<String, String>>,
+    request_sender: Option<mpsc::UnboundedSender<RemoteLogDownloadRequest>>,
+    remote_fs_props: Option<Arc<RwLock<HashMap<String, String>>>>,
 }
 
 impl RemoteLogDownloader {
-    pub fn new(local_log_dir: TempDir) -> Result<Self> {
+    pub fn new(
+        local_log_dir: TempDir,
+        max_prefetch_segments: usize,
+        max_concurrent_downloads: usize,
+    ) -> Result<Self> {
+        let remote_fs_props = Arc::new(RwLock::new(HashMap::new()));
+        let fetcher = Arc::new(ProductionFetcher {
+            remote_fs_props: remote_fs_props.clone(),
+            local_log_dir: Arc::new(local_log_dir),
+        });
+
+        let mut downloader =
+            Self::new_with_fetcher(fetcher, max_prefetch_segments, 
max_concurrent_downloads)?;
+        downloader.remote_fs_props = Some(remote_fs_props);
+        Ok(downloader)
+    }
+
+    /// Create a RemoteLogDownloader with a custom fetcher (for testing).
+    /// The remote_fs_props will be None since custom fetchers typically don't 
need S3 credentials.
+    pub fn new_with_fetcher(
+        fetcher: Arc<dyn RemoteLogFetcher>,
+        max_prefetch_segments: usize,
+        max_concurrent_downloads: usize,
+    ) -> Result<Self> {
+        let (request_sender, request_receiver) = mpsc::unbounded_channel();
+
+        let coordinator = DownloadCoordinator {
+            download_queue: BinaryHeap::new(),
+            active_downloads: JoinSet::new(),
+            in_flight: 0,
+            prefetch_semaphore: 
Arc::new(Semaphore::new(max_prefetch_segments)),
+            max_concurrent_downloads,
+            recycle_notify: Arc::new(Notify::new()),
+            fetcher,
+        };
+
+        // Spawn coordinator task - it will exit when request_sender is dropped
+        tokio::spawn(coordinator_loop(coordinator, request_receiver));
+
         Ok(Self {
-            local_log_dir,
-            remote_fs_props: RwLock::new(HashMap::new()),
+            request_sender: Some(request_sender),
+            remote_fs_props: None,
         })
     }
 
     pub fn set_remote_fs_props(&self, props: HashMap<String, String>) {
-        *self.remote_fs_props.write() = props;
+        if let Some(ref remote_fs_props) = self.remote_fs_props {
+            *remote_fs_props.write() = props;
+        }
     }
 
     /// Request to fetch a remote log segment to local. This method is 
non-blocking.
@@ -214,49 +787,44 @@ impl RemoteLogDownloader {
         remote_log_tablet_dir: &str,
         segment: &RemoteLogSegment,
     ) -> RemoteLogDownloadFuture {
-        let (sender, receiver) = oneshot::channel();
-        let local_file_name = segment.local_file_name();
-        let local_file_path = self.local_log_dir.path().join(&local_file_name);
-        let remote_path = self.build_remote_path(remote_log_tablet_dir, 
segment);
-        let remote_log_tablet_dir = remote_log_tablet_dir.to_string();
-        let remote_fs_props = self.remote_fs_props.read().clone();
-        // Spawn async download & read task
-        tokio::spawn(async move {
-            let result = async {
-                let file_path = Self::download_file(
-                    &remote_log_tablet_dir,
-                    &remote_path,
-                    &local_file_path,
-                    &remote_fs_props,
-                )
-                .await?;
-                let bytes = tokio::fs::read(&file_path).await?;
-
-                // Delete the downloaded local file to free disk (async, but 
we'll do it in background)
-                let file_path_clone = file_path.clone();
-                tokio::spawn(async move {
-                    let _ = delete_file(file_path_clone).await;
-                });
-
-                Ok(bytes)
+        let (result_sender, result_receiver) = oneshot::channel();
+
+        let request = RemoteLogDownloadRequest {
+            segment: segment.clone(),
+            remote_log_tablet_dir: remote_log_tablet_dir.to_string(),
+            result_sender,
+            retry_count: 0,
+            next_retry_at: None,
+        };
+
+        // Send to coordinator (non-blocking)
+        if let Some(ref sender) = self.request_sender {
+            if sender.send(request).is_err() {
+                // Coordinator is gone - immediately fail the future
+                let (error_sender, error_receiver) = oneshot::channel();
+                let _ = error_sender.send(Err(Error::UnexpectedError {
+                    message: "RemoteLogDownloader coordinator has shut 
down".to_string(),
+                    source: None,
+                }));
+                return RemoteLogDownloadFuture::new(error_receiver);
             }
-            .await;
+        }
 
-            let _ = sender.send(result);
-        });
-        RemoteLogDownloadFuture::new(receiver)
+        RemoteLogDownloadFuture::new(result_receiver)
     }
+}
 
-    /// Build the remote path for a log segment
-    fn build_remote_path(&self, remote_log_tablet_dir: &str, segment: 
&RemoteLogSegment) -> String {
-        // Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log
-        let offset_prefix = format!("{:020}", segment.start_offset);
-        format!(
-            "{}/{}/{}.log",
-            remote_log_tablet_dir, segment.segment_id, offset_prefix
-        )
+impl Drop for RemoteLogDownloader {
+    fn drop(&mut self) {
+        // Drop the request sender to signal coordinator shutdown.
+        // This causes request_receiver.recv() to return None, allowing the
+        // coordinator to exit gracefully after processing pending work.
+        // The coordinator task will finish on its own when it sees the 
channel closed.
+        drop(self.request_sender.take());
     }
+}
 
+impl RemoteLogDownloader {
     /// Download a file from remote storage to local using streaming read/write
     async fn download_file(
         remote_log_tablet_dir: &str,
@@ -293,7 +861,7 @@ impl RemoteLogDownloader {
         let (op, relative_path) = storage.create(remote_path)?;
 
         // Timeout for remote storage operations (30 seconds)
-        const REMOTE_OP_TIMEOUT: std::time::Duration = 
std::time::Duration::from_secs(30);
+        const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30);
 
         // Get file metadata to know the size with timeout
         let meta = op.stat(relative_path).await?;
@@ -310,7 +878,7 @@ impl RemoteLogDownloader {
         let total_chunks = file_size.div_ceil(CHUNK_SIZE);
 
         while offset < file_size {
-            let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
+            let end = min(offset + CHUNK_SIZE, file_size);
             let range = offset..end;
             chunk_count += 1;
 
@@ -347,70 +915,349 @@ impl RemoteLogDownloader {
     }
 }
 
-/// Pending fetch that waits for remote log file to be downloaded
-pub struct RemotePendingFetch {
-    segment: RemoteLogSegment,
-    download_future: RemoteLogDownloadFuture,
-    pos_in_log_segment: i32,
-    fetch_offset: i64,
-    high_watermark: i64,
-    read_context: ReadContext,
-}
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::sync::atomic::{AtomicUsize, Ordering};
 
-impl RemotePendingFetch {
-    pub fn new(
-        segment: RemoteLogSegment,
-        download_future: RemoteLogDownloadFuture,
-        pos_in_log_segment: i32,
-        fetch_offset: i64,
-        high_watermark: i64,
-        read_context: ReadContext,
-    ) -> Self {
-        Self {
+    /// Helper function to create a TableBucket for testing
+    fn create_table_bucket(table_id: i64, bucket_id: i32) -> TableBucket {
+        TableBucket::new(table_id, bucket_id)
+    }
+
+    /// Simplified fake fetcher for testing
+    struct FakeFetcher {
+        completion_gate: Arc<Notify>,
+        in_flight: Arc<AtomicUsize>,
+        max_seen_in_flight: Arc<AtomicUsize>,
+        fail_count: Arc<Mutex<usize>>,
+        auto_complete: bool,
+    }
+
+    impl FakeFetcher {
+        fn new(fail_count: usize, auto_complete: bool) -> Self {
+            Self {
+                completion_gate: Arc::new(Notify::new()),
+                in_flight: Arc::new(AtomicUsize::new(0)),
+                max_seen_in_flight: Arc::new(AtomicUsize::new(0)),
+                fail_count: Arc::new(Mutex::new(fail_count)),
+                auto_complete,
+            }
+        }
+
+        fn max_seen_in_flight(&self) -> usize {
+            self.max_seen_in_flight.load(Ordering::SeqCst)
+        }
+
+        fn in_flight(&self) -> usize {
+            self.in_flight.load(Ordering::SeqCst)
+        }
+
+        fn release_one(&self) {
+            self.completion_gate.notify_one();
+        }
+
+        fn release_all(&self) {
+            self.completion_gate.notify_waiters();
+        }
+    }
+
+    impl RemoteLogFetcher for FakeFetcher {
+        fn fetch(
+            &self,
+            request: &RemoteLogDownloadRequest,
+        ) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
+            let gate = self.completion_gate.clone();
+            let in_flight = self.in_flight.clone();
+            let max_seen = self.max_seen_in_flight.clone();
+            let fail_count = self.fail_count.clone();
+            let segment_id = request.segment().segment_id.clone();
+            let auto_complete = self.auto_complete;
+
+            Box::pin(async move {
+                // Track in-flight
+                let current = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
+                max_seen.fetch_max(current, Ordering::SeqCst);
+
+                // Wait for gate (or auto-complete)
+                if !auto_complete {
+                    gate.notified().await;
+                } else {
+                    tokio::task::yield_now().await;
+                }
+
+                // Check if should fail
+                let should_fail = {
+                    let mut count = fail_count.lock();
+                    if *count > 0 {
+                        *count -= 1;
+                        true
+                    } else {
+                        false
+                    }
+                };
+
+                in_flight.fetch_sub(1, Ordering::SeqCst);
+
+                if should_fail {
+                    Err(Error::UnexpectedError {
+                        message: format!("Fake fetch failed for {}", 
segment_id),
+                        source: None,
+                    })
+                } else {
+                    let fake_data = vec![1, 2, 3, 4];
+                    let temp_dir = env::temp_dir();
+                    let timestamp = SystemTime::now()
+                        .duration_since(UNIX_EPOCH)
+                        .unwrap()
+                        .as_nanos();
+                    let file_path =
+                        temp_dir.join(format!("fake_segment_{}_{}.log", 
segment_id, timestamp));
+                    tokio::fs::write(&file_path, &fake_data).await?;
+
+                    Ok(FetchResult {
+                        file_path,
+                        file_size: fake_data.len(),
+                    })
+                }
+            })
+        }
+    }
+
+    /// Helper function to create a RemoteLogSegment for testing
+    fn create_segment(
+        segment_id: &str,
+        start_offset: i64,
+        max_timestamp: i64,
+        table_bucket: TableBucket,
+    ) -> RemoteLogSegment {
+        RemoteLogSegment {
+            segment_id: segment_id.to_string(),
+            start_offset,
+            end_offset: start_offset + 1000,
+            size_in_bytes: 1024,
+            table_bucket,
+            max_timestamp,
+        }
+    }
+
+    /// Helper function to create a RemoteLogDownloadRequest for testing
+    fn create_request(segment: RemoteLogSegment) -> RemoteLogDownloadRequest {
+        let (result_sender, _) = oneshot::channel();
+        RemoteLogDownloadRequest {
+            remote_log_tablet_dir: "test_dir".to_string(),
             segment,
-            download_future,
-            pos_in_log_segment,
-            fetch_offset,
-            high_watermark,
-            read_context,
+            result_sender,
+            retry_count: 0,
+            next_retry_at: None,
         }
     }
-}
 
-impl PendingFetch for RemotePendingFetch {
-    fn table_bucket(&self) -> &TableBucket {
-        &self.segment.table_bucket
+    #[test]
+    fn test_priority_ordering_matching_java_test_case() {
+        // Test priority ordering: timestamp across buckets, offset within 
bucket
+        // Does NOT test tie-breakers (segment_id) - those are implementation 
details
+
+        let bucket1 = create_table_bucket(1, 0);
+        let bucket2 = create_table_bucket(1, 1);
+        let bucket3 = create_table_bucket(1, 2);
+        let bucket4 = create_table_bucket(1, 3);
+
+        // Create segments with distinct timestamps/offsets (no ties)
+        let seg_negative = create_segment("seg_neg", 0, -1, bucket1.clone());
+        let seg_zero = create_segment("seg_zero", 0, 0, bucket2.clone());
+        let seg_1000 = create_segment("seg_1000", 0, 1000, bucket3.clone());
+        let seg_2000 = create_segment("seg_2000", 0, 2000, bucket4.clone());
+        let seg_same_bucket_100 = create_segment("seg_sb_100", 100, 5000, 
bucket1.clone());
+        let seg_same_bucket_50 = create_segment("seg_sb_50", 50, 5000, 
bucket1.clone());
+
+        let mut heap = BinaryHeap::new();
+        heap.push(Reverse(create_request(seg_2000)));
+        heap.push(Reverse(create_request(seg_same_bucket_100)));
+        heap.push(Reverse(create_request(seg_1000)));
+        heap.push(Reverse(create_request(seg_zero)));
+        heap.push(Reverse(create_request(seg_negative)));
+        heap.push(Reverse(create_request(seg_same_bucket_50)));
+
+        // Verify ordering by timestamp/offset, not segment_id
+        let first = heap.pop().unwrap().0;
+        assert_eq!(first.segment.max_timestamp, -1, "Lowest timestamp first");
+
+        let second = heap.pop().unwrap().0;
+        assert_eq!(second.segment.max_timestamp, 0);
+
+        let third = heap.pop().unwrap().0;
+        assert_eq!(third.segment.max_timestamp, 1000);
+
+        let fourth = heap.pop().unwrap().0;
+        assert_eq!(fourth.segment.max_timestamp, 2000);
+
+        // Last two are same bucket (ts=5000), ordered by offset
+        let fifth = heap.pop().unwrap().0;
+        assert_eq!(fifth.segment.max_timestamp, 5000);
+        assert_eq!(
+            fifth.segment.start_offset, 50,
+            "Lower offset first within bucket"
+        );
+
+        let sixth = heap.pop().unwrap().0;
+        assert_eq!(sixth.segment.max_timestamp, 5000);
+        assert_eq!(sixth.segment.start_offset, 100);
     }
 
-    fn is_completed(&self) -> bool {
-        self.download_future.is_done()
+    #[tokio::test]
+    async fn test_concurrency_and_priority() {
+        // Test concurrency limiting and priority-based scheduling together
+        let fake_fetcher = Arc::new(FakeFetcher::new(0, false)); // Manual 
control
+
+        let downloader = RemoteLogDownloader::new_with_fetcher(
+            fake_fetcher.clone(),
+            10, // High prefetch limit
+            2,  // Max concurrent downloads = 2
+        )
+        .unwrap();
+
+        let bucket = create_table_bucket(1, 0);
+
+        // Request 4 segments with same priority (to isolate concurrency 
limiting from priority)
+        let segs: Vec<_> = (0..4)
+            .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, 
bucket.clone()))
+            .collect();
+
+        let _futures: Vec<_> = segs
+            .iter()
+            .map(|seg| downloader.request_remote_log("dir", seg))
+            .collect();
+
+        // Wait for exactly 2 to start
+        tokio::time::sleep(Duration::from_millis(50)).await;
+        assert_eq!(
+            fake_fetcher.in_flight(),
+            2,
+            "Concurrency limit: exactly 2 should be in-flight"
+        );
+
+        // Release one
+        fake_fetcher.release_one();
+        tokio::time::sleep(Duration::from_millis(50)).await;
+
+        // Max should never exceed 2
+        assert_eq!(
+            fake_fetcher.max_seen_in_flight(),
+            2,
+            "Max concurrent should not exceed 2"
+        );
+
+        // Release all
+        fake_fetcher.release_all();
     }
 
-    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
-        // Get the file path (this should only be called when is_completed() 
returns true)
-        let mut data = self.download_future.get_remote_log_bytes()?;
+    #[tokio::test]
+    async fn test_prefetch_limit() {
+        // Test that prefetch semaphore limits outstanding downloads
+        let fake_fetcher = Arc::new(FakeFetcher::new(0, true)); // 
Auto-complete
 
-        // Slice the data if needed
-        let data = if self.pos_in_log_segment > 0 {
-            data.split_off(self.pos_in_log_segment as usize)
-        } else {
-            data
-        };
+        let downloader = RemoteLogDownloader::new_with_fetcher(
+            fake_fetcher,
+            2,  // Max prefetch = 2
+            10, // High concurrent limit
+        )
+        .unwrap();
+
+        let bucket = create_table_bucket(1, 0);
 
-        let size_in_bytes = data.len();
+        // Request 4 downloads
+        let segs: Vec<_> = (0..4)
+            .map(|i| create_segment(&format!("seg{}", i), i * 100, 1000, 
bucket.clone()))
+            .collect();
 
-        let log_record_batch = LogRecordsBatches::new(data);
+        let mut futures: Vec<_> = segs
+            .iter()
+            .map(|seg| downloader.request_remote_log("dir", seg))
+            .collect();
 
-        // Create DefaultCompletedFetch from the data
-        let completed_fetch = DefaultCompletedFetch::new(
-            self.segment.table_bucket,
-            log_record_batch,
-            size_in_bytes,
-            self.read_context,
-            self.fetch_offset,
-            self.high_watermark,
+        // Wait for first 2 to complete
+        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
+        loop {
+            if futures.iter().filter(|f| f.is_done()).count() >= 2 {
+                break;
+            }
+            if tokio::time::Instant::now() > deadline {
+                panic!("Timeout waiting for first 2 downloads");
+            }
+            tokio::time::sleep(Duration::from_millis(10)).await;
+        }
+
+        // Verify 3rd and 4th are blocked (prefetch limit)
+        tokio::time::sleep(Duration::from_millis(50)).await;
+        assert_eq!(
+            futures.iter().filter(|f| f.is_done()).count(),
+            2,
+            "Prefetch limit: only 2 should complete"
         );
 
-        Ok(Box::new(completed_fetch))
+        // Drop first 2 (releases permits)
+        let f4 = futures.pop().unwrap();
+        let f3 = futures.pop().unwrap();
+        drop(futures);
+
+        // 3rd and 4th should now complete
+        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
+        loop {
+            if f3.is_done() && f4.is_done() {
+                break;
+            }
+            if tokio::time::Instant::now() > deadline {
+                panic!("Timeout after permit release");
+            }
+            tokio::time::sleep(Duration::from_millis(10)).await;
+        }
+    }
+
+    #[tokio::test]
+    async fn test_retry_and_cancellation() {
+        // Test retry with exponential backoff
+        let fake_fetcher = Arc::new(FakeFetcher::new(2, true)); // Fail twice, 
succeed third time
+
+        let downloader =
+            RemoteLogDownloader::new_with_fetcher(fake_fetcher.clone(), 10, 
1).unwrap();
+
+        let bucket = create_table_bucket(1, 0);
+        let seg = create_segment("seg1", 0, 1000, bucket);
+
+        let future = downloader.request_remote_log("dir", &seg);
+
+        // Should succeed after retries
+        let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
+        loop {
+            if future.is_done() {
+                break;
+            }
+            if tokio::time::Instant::now() > deadline {
+                panic!("Timeout waiting for retry to succeed");
+            }
+            tokio::time::sleep(Duration::from_millis(50)).await;
+        }
+
+        assert!(future.is_done(), "Should succeed after retries");
+
+        // Test cancellation
+        let seg2 = create_segment("seg2", 100, 1000, create_table_bucket(1, 
0));
+        let fake_fetcher2 = Arc::new(FakeFetcher::new(100, true)); // Fail 
forever
+        let downloader2 =
+            RemoteLogDownloader::new_with_fetcher(fake_fetcher2.clone(), 10, 
1).unwrap();
+
+        let future2 = downloader2.request_remote_log("dir", &seg2);
+        tokio::time::sleep(Duration::from_millis(50)).await;
+
+        // Drop to cancel
+        drop(future2);
+        tokio::time::sleep(Duration::from_millis(50)).await;
+
+        assert_eq!(
+            fake_fetcher2.in_flight(),
+            0,
+            "Cancellation should release resources"
+        );
     }
 }
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index afa44f3..8712650 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -19,10 +19,12 @@ use arrow::array::RecordBatch;
 use arrow_schema::SchemaRef;
 use log::{debug, warn};
 use parking_lot::{Mutex, RwLock};
-use std::collections::{HashMap, HashSet};
-use std::slice::from_ref;
-use std::sync::Arc;
-use std::time::Duration;
+use std::{
+    collections::{HashMap, HashSet},
+    slice::from_ref,
+    sync::Arc,
+    time::{Duration, Instant},
+};
 use tempfile::TempDir;
 
 use crate::client::connection::FlussConnection;
@@ -30,11 +32,9 @@ use crate::client::credentials::CredentialsCache;
 use crate::client::metadata::Metadata;
 use crate::client::table::log_fetch_buffer::{
     CompletedFetch, DefaultCompletedFetch, FetchErrorAction, 
FetchErrorContext, FetchErrorLogLevel,
-    LogFetchBuffer,
-};
-use crate::client::table::remote_log::{
-    RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch,
+    LogFetchBuffer, RemotePendingFetch,
 };
+use crate::client::table::remote_log::{RemoteLogDownloader, 
RemoteLogFetchInfo};
 use crate::error::{ApiError, Error, FlussError, Result};
 use crate::metadata::{PhysicalTablePath, TableBucket, TableInfo, TablePath};
 use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
@@ -223,6 +223,7 @@ impl<'a> TableScan<'a> {
             &self.table_info,
             self.metadata.clone(),
             self.conn.get_connections(),
+            self.conn.config(),
             self.projected_fields,
         )?;
         Ok(LogScanner {
@@ -235,6 +236,7 @@ impl<'a> TableScan<'a> {
             &self.table_info,
             self.metadata.clone(),
             self.conn.get_connections(),
+            self.conn.config(),
             self.projected_fields,
         )?;
         Ok(RecordBatchLogScanner {
@@ -273,6 +275,7 @@ impl LogScannerInner {
         table_info: &TableInfo,
         metadata: Arc<Metadata>,
         connections: Arc<RpcClient>,
+        config: &crate::config::Config,
         projected_fields: Option<Vec<usize>>,
     ) -> Result<Self> {
         let log_scanner_status = Arc::new(LogScannerStatus::new());
@@ -286,13 +289,14 @@ impl LogScannerInner {
                 connections.clone(),
                 metadata.clone(),
                 log_scanner_status.clone(),
+                config,
                 projected_fields,
             )?,
         })
     }
 
     async fn poll_records(&self, timeout: Duration) -> Result<ScanRecords> {
-        let start = std::time::Instant::now();
+        let start = Instant::now();
         let deadline = start + timeout;
 
         loop {
@@ -307,7 +311,7 @@ impl LogScannerInner {
             }
 
             // No data available, check if we should wait
-            let now = std::time::Instant::now();
+            let now = Instant::now();
             if now >= deadline {
                 // Timeout reached, return empty result
                 return Ok(ScanRecords::new(HashMap::new()));
@@ -376,7 +380,7 @@ impl LogScannerInner {
     }
 
     async fn poll_batches(&self, timeout: Duration) -> 
Result<Vec<RecordBatch>> {
-        let start = std::time::Instant::now();
+        let start = Instant::now();
         let deadline = start + timeout;
 
         loop {
@@ -387,7 +391,7 @@ impl LogScannerInner {
                 return Ok(batches);
             }
 
-            let now = std::time::Instant::now();
+            let now = Instant::now();
             if now >= deadline {
                 return Ok(Vec::new());
             }
@@ -478,6 +482,7 @@ impl LogFetcher {
         conns: Arc<RpcClient>,
         metadata: Arc<Metadata>,
         log_scanner_status: Arc<LogScannerStatus>,
+        config: &crate::config::Config,
         projected_fields: Option<Vec<usize>>,
     ) -> Result<Self> {
         let full_arrow_schema = to_arrow_schema(table_info.get_row_type())?;
@@ -497,7 +502,11 @@ impl LogFetcher {
             log_scanner_status,
             read_context,
             remote_read_context,
-            remote_log_downloader: 
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+            remote_log_downloader: Arc::new(RemoteLogDownloader::new(
+                tmp_dir,
+                config.scanner_remote_log_prefetch_num,
+                config.scanner_remote_log_download_threads,
+            )?),
             credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), 
metadata.clone())),
             log_fetch_buffer,
             nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
@@ -1510,6 +1519,7 @@ mod tests {
             Arc::new(RpcClient::new()),
             metadata,
             status.clone(),
+            &crate::config::Config::default(),
             None,
         )?;
 
@@ -1529,8 +1539,8 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()> {
+    #[tokio::test]
+    async fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()> 
{
         let table_path = TablePath::new("db".to_string(), "tbl".to_string());
         let table_info = build_table_info(table_path.clone(), 1, 1);
         let cluster = build_cluster_arc(&table_path, 1, 1);
@@ -1541,6 +1551,7 @@ mod tests {
             Arc::new(RpcClient::new()),
             metadata,
             status,
+            &crate::config::Config::default(),
             None,
         )?;
 
@@ -1576,6 +1587,7 @@ mod tests {
             Arc::new(RpcClient::new()),
             metadata,
             status,
+            &crate::config::Config::default(),
             None,
         )?;
 
@@ -1599,6 +1611,7 @@ mod tests {
             Arc::new(RpcClient::new()),
             metadata.clone(),
             status.clone(),
+            &crate::config::Config::default(),
             None,
         )?;
 
@@ -1649,6 +1662,7 @@ mod tests {
             Arc::new(RpcClient::new()),
             metadata.clone(),
             status.clone(),
+            &crate::config::Config::default(),
             None,
         )?;
 
diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs
index 92f600e..705e241 100644
--- a/crates/fluss/src/config.rs
+++ b/crates/fluss/src/config.rs
@@ -36,6 +36,16 @@ pub struct Config {
 
     #[arg(long, default_value_t = 2 * 1024 * 1024)]
     pub writer_batch_size: i32,
+
+    /// Maximum number of remote log segments to prefetch
+    /// Default: 4 (matching Java CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM)
+    #[arg(long, default_value_t = 4)]
+    pub scanner_remote_log_prefetch_num: usize,
+
+    /// Maximum concurrent remote log downloads
+    /// Default: 3 (matching Java REMOTE_FILE_DOWNLOAD_THREAD_NUM)
+    #[arg(long, default_value_t = 3)]
+    pub scanner_remote_log_download_threads: usize,
 }
 
 impl Default for Config {
@@ -46,6 +56,8 @@ impl Default for Config {
             writer_acks: String::from("all"),
             writer_retries: i32::MAX,
             writer_batch_size: 2 * 1024 * 1024,
+            scanner_remote_log_prefetch_num: 4,
+            scanner_remote_log_download_threads: 3,
         }
     }
 }
diff --git a/crates/fluss/src/proto/fluss_api.proto 
b/crates/fluss/src/proto/fluss_api.proto
index eaee94c..65eddce 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -306,6 +306,7 @@ message PbRemoteLogSegment {
   required int64 remote_log_start_offset = 2;
   required int64 remote_log_end_offset = 3;
   required int32 segment_size_in_bytes = 4;
+  optional int64 max_timestamp = 5;
 }
 
 message PbListOffsetsRespForBucket {
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 39114d3..4bfdc71 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -46,7 +46,10 @@ use byteorder::{ByteOrder, LittleEndian};
 use bytes::Bytes;
 use crc32c::crc32c;
 use std::{
-    io::{Cursor, Write},
+    collections::HashMap,
+    fs::File,
+    io::{Cursor, Read, Seek, SeekFrom, Write},
+    path::PathBuf,
     sync::Arc,
 };
 
@@ -82,6 +85,11 @@ pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET;
 pub const ARROW_CHANGETYPE_OFFSET: usize = RECORD_BATCH_HEADER_SIZE;
 pub const LOG_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH;
 
+/// Maximum batch size matches Java's Integer.MAX_VALUE limit.
+/// Java uses int type for batch size, so max value is 2^31 - 1 = 
2,147,483,647 bytes (~2GB).
+/// This is the implicit limit in FileLogRecords.java and other Java 
components.
+pub const MAX_BATCH_SIZE: usize = i32::MAX as usize; // 2,147,483,647 bytes 
(~2GB)
+
 /// const for record
 /// The "magic" values.
 #[derive(Debug, Clone, Copy)]
@@ -89,6 +97,49 @@ pub enum LogMagicValue {
     V0 = 0,
 }
 
+/// Safely convert batch size from i32 to usize with validation.
+///
+/// Validates that:
+/// - batch_size_bytes is non-negative
+/// - batch_size_bytes + LOG_OVERHEAD doesn't overflow
+/// - Result is within reasonable bounds
+fn validate_batch_size(batch_size_bytes: i32) -> Result<usize> {
+    // Check for negative size (corrupted data)
+    if batch_size_bytes < 0 {
+        return Err(Error::UnexpectedError {
+            message: format!("Invalid negative batch size: {}", 
batch_size_bytes),
+            source: None,
+        });
+    }
+
+    let batch_size_u = batch_size_bytes as usize;
+
+    // Check for overflow when adding LOG_OVERHEAD
+    let total_size =
+        batch_size_u
+            .checked_add(LOG_OVERHEAD)
+            .ok_or_else(|| Error::UnexpectedError {
+                message: format!(
+                    "Batch size {} + LOG_OVERHEAD {} would overflow",
+                    batch_size_u, LOG_OVERHEAD
+                ),
+                source: None,
+            })?;
+
+    // Sanity check: reject unreasonably large batches
+    if total_size > MAX_BATCH_SIZE {
+        return Err(Error::UnexpectedError {
+            message: format!(
+                "Batch size {} exceeds maximum allowed size {}",
+                total_size, MAX_BATCH_SIZE
+            ),
+            source: None,
+        });
+    }
+
+    Ok(total_size)
+}
+
 // NOTE: Rust layout/offsets currently match Java only for V0.
 // TODO: Add V1 layout/offsets to keep parity with Java's V1 format.
 pub const CURRENT_LOG_MAGIC_VALUE: u8 = LogMagicValue::V0 as u8;
@@ -441,53 +492,280 @@ pub trait ToArrow {
     fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
 }
 
-pub struct LogRecordsBatches {
+/// In-memory log record source.
+/// Used for local tablet server fetches (existing path).
+struct MemorySource {
     data: Bytes,
+}
+
+impl MemorySource {
+    fn new(data: Vec<u8>) -> Self {
+        Self {
+            data: Bytes::from(data),
+        }
+    }
+
+    fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
+        if pos + LOG_OVERHEAD > self.data.len() {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Position {} + LOG_OVERHEAD {} exceeds data size {}",
+                    pos,
+                    LOG_OVERHEAD,
+                    self.data.len()
+                ),
+                source: None,
+            });
+        }
+
+        let base_offset = LittleEndian::read_i64(&self.data[pos + 
BASE_OFFSET_OFFSET..]);
+        let batch_size_bytes = LittleEndian::read_i32(&self.data[pos + 
LENGTH_OFFSET..]);
+
+        // Validate batch size to prevent integer overflow and corruption
+        let batch_size = validate_batch_size(batch_size_bytes)?;
+
+        Ok((base_offset, batch_size))
+    }
+
+    fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
+        if pos + size > self.data.len() {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Read beyond data size: {} + {} > {}",
+                    pos,
+                    size,
+                    self.data.len()
+                ),
+                source: None,
+            });
+        }
+        // Zero-copy slice (Bytes is Arc-based)
+        Ok(self.data.slice(pos..pos + size))
+    }
+
+    fn total_size(&self) -> usize {
+        self.data.len()
+    }
+}
+
+/// RAII guard that deletes a file when dropped.
+/// Used to ensure file deletion happens AFTER the file handle is closed.
+struct FileCleanupGuard {
+    file_path: PathBuf,
+}
+
+impl Drop for FileCleanupGuard {
+    fn drop(&mut self) {
+        // File handle is already closed (this guard drops after the file 
field)
+        if let Err(e) = std::fs::remove_file(&self.file_path) {
+            log::warn!(
+                "Failed to delete remote log file {}: {}",
+                self.file_path.display(),
+                e
+            );
+        } else {
+            log::debug!("Deleted remote log file: {}", 
self.file_path.display());
+        }
+    }
+}
+
+/// File-backed log record source.
+/// Used for remote log segments downloaded to local disk.
+/// Streams data on-demand instead of loading entire file into memory.
+///
+/// Uses seek + read_exact for cross-platform compatibility.
+/// Access pattern is sequential iteration (single consumer).
+struct FileSource {
+    file: File,
+    file_size: usize,
+    base_offset: usize,
+    _cleanup: Option<FileCleanupGuard>, // Drops AFTER file (field order 
matters!)
+}
+
+impl FileSource {
+    /// Create a new FileSource.
+    ///
+    /// The file at `file_path` will be deleted when this FileSource is 
dropped.
+    fn new(file: File, base_offset: usize, file_path: PathBuf) -> Result<Self> 
{
+        let file_size = file.metadata()?.len() as usize;
+
+        // Validate base_offset to prevent underflow in total_size()
+        if base_offset > file_size {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "base_offset ({}) exceeds file_size ({})",
+                    base_offset, file_size
+                ),
+                source: None,
+            });
+        }
+
+        Ok(Self {
+            file,
+            file_size,
+            base_offset,
+            _cleanup: Some(FileCleanupGuard { file_path }),
+        })
+    }
+
+    /// Read data at a specific position using seek + read_exact.
+    /// This is cross-platform and adequate for sequential access patterns.
+    fn read_at(&mut self, pos: u64, buf: &mut [u8]) -> Result<()> {
+        self.file.seek(SeekFrom::Start(pos))?;
+        self.file.read_exact(buf)?;
+        Ok(())
+    }
+
+    fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
+        let actual_pos = self.base_offset + pos;
+        if actual_pos + LOG_OVERHEAD > self.file_size {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Position {} exceeds file size {}",
+                    actual_pos, self.file_size
+                ),
+                source: None,
+            });
+        }
+
+        // Read only the header to extract base_offset and batch_size
+        let mut header_buf = vec![0u8; LOG_OVERHEAD];
+        self.read_at(actual_pos as u64, &mut header_buf)?;
+
+        let base_offset = 
LittleEndian::read_i64(&header_buf[BASE_OFFSET_OFFSET..]);
+        let batch_size_bytes = 
LittleEndian::read_i32(&header_buf[LENGTH_OFFSET..]);
+
+        // Validate batch size to prevent integer overflow and corruption
+        let batch_size = validate_batch_size(batch_size_bytes)?;
+
+        Ok((base_offset, batch_size))
+    }
+
+    fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
+        let actual_pos = self.base_offset + pos;
+        if actual_pos + size > self.file_size {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Read beyond file size: {} + {} > {}",
+                    actual_pos, size, self.file_size
+                ),
+                source: None,
+            });
+        }
+
+        // Read the full batch data
+        let mut batch_buf = vec![0u8; size];
+        self.read_at(actual_pos as u64, &mut batch_buf)?;
+
+        Ok(Bytes::from(batch_buf))
+    }
+
+    fn total_size(&self) -> usize {
+        self.file_size - self.base_offset
+    }
+}
+
+/// Enum for different log record sources.
+enum LogRecordsSource {
+    Memory(MemorySource),
+    File(FileSource),
+}
+
+impl LogRecordsSource {
+    fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
+        match self {
+            Self::Memory(s) => s.read_batch_header(pos),
+            Self::File(s) => s.read_batch_header(pos),
+        }
+    }
+
+    fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
+        match self {
+            Self::Memory(s) => s.read_batch_data(pos, size),
+            Self::File(s) => s.read_batch_data(pos, size),
+        }
+    }
+
+    fn total_size(&self) -> usize {
+        match self {
+            Self::Memory(s) => s.total_size(),
+            Self::File(s) => s.total_size(),
+        }
+    }
+}
+
+pub struct LogRecordsBatches {
+    source: LogRecordsSource,
     current_pos: usize,
     remaining_bytes: usize,
 }
 
 impl LogRecordsBatches {
+    /// Create from in-memory Vec (existing path - backward compatible).
     pub fn new(data: Vec<u8>) -> Self {
-        let remaining_bytes: usize = data.len();
+        let source = LogRecordsSource::Memory(MemorySource::new(data));
+        let remaining_bytes = source.total_size();
         Self {
-            data: Bytes::from(data),
+            source,
             current_pos: 0,
             remaining_bytes,
         }
     }
 
-    pub fn next_batch_size(&self) -> Option<usize> {
+    /// Create from file.
+    /// Enables streaming without loading entire file into memory.
+    ///
+    /// The file at `file_path` will be deleted when dropped.
+    /// This ensures the file is closed before deletion.
+    pub fn from_file(file: File, base_offset: usize, file_path: PathBuf) -> 
Result<Self> {
+        let source = FileSource::new(file, base_offset, file_path)?;
+        let remaining_bytes = source.total_size();
+        Ok(Self {
+            source: LogRecordsSource::File(source),
+            current_pos: 0,
+            remaining_bytes,
+        })
+    }
+
+    /// Try to get the size of the next batch.
+    fn next_batch_size(&mut self) -> Result<Option<usize>> {
         if self.remaining_bytes < LOG_OVERHEAD {
-            return None;
+            return Ok(None);
         }
 
-        let batch_size_bytes =
-            LittleEndian::read_i32(self.data.get(self.current_pos + 
LENGTH_OFFSET..).unwrap());
-        let batch_size = batch_size_bytes as usize + LOG_OVERHEAD;
-        if batch_size > self.remaining_bytes {
-            return None;
+        // Read only header to get size
+        match self.source.read_batch_header(self.current_pos) {
+            Ok((_base_offset, batch_size)) => {
+                if batch_size > self.remaining_bytes {
+                    Ok(None)
+                } else {
+                    Ok(Some(batch_size))
+                }
+            }
+            Err(e) => Err(e),
         }
-        Some(batch_size)
     }
 }
 
 impl Iterator for LogRecordsBatches {
-    type Item = LogRecordBatch;
+    type Item = Result<LogRecordBatch>;
 
     fn next(&mut self) -> Option<Self::Item> {
         match self.next_batch_size() {
-            Some(batch_size) => {
-                let start = self.current_pos;
-                let end = start + batch_size;
-                // Since LogRecordsBatches owns the Vec<u8>, the slice is valid
-                // as long as the mutable reference exists, which is 'a
-                let record_batch = 
LogRecordBatch::new(self.data.slice(start..end));
-                self.current_pos += batch_size;
-                self.remaining_bytes -= batch_size;
-                Some(record_batch)
+            Ok(Some(batch_size)) => {
+                // Read full batch data on-demand
+                match self.source.read_batch_data(self.current_pos, 
batch_size) {
+                    Ok(data) => {
+                        let record_batch = LogRecordBatch::new(data);
+                        self.current_pos += batch_size;
+                        self.remaining_bytes -= batch_size;
+                        Some(Ok(record_batch))
+                    }
+                    Err(e) => Some(Err(e)),
+                }
             }
-            None => None,
+            Ok(None) => None,
+            Err(e) => Some(Err(e)),
         }
     }
 }
@@ -1012,7 +1290,7 @@ impl ReadContext {
             &body_buffer,
             batch_metadata,
             resolve_schema,
-            &std::collections::HashMap::new(),
+            &HashMap::new(),
             None,
             &version,
         )?;
@@ -1052,7 +1330,7 @@ impl ReadContext {
             &body_buffer,
             batch_metadata,
             self.full_schema.clone(),
-            &std::collections::HashMap::new(),
+            &HashMap::new(),
             None,
             &version,
         )?;
@@ -1157,7 +1435,7 @@ pub struct MyVec<T>(pub StreamReader<T>);
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::metadata::{DataField, DataTypes};
+    use crate::metadata::{DataField, DataTypes, RowType};
 
     #[test]
     fn test_to_array_type() {
@@ -1456,6 +1734,49 @@ mod tests {
         Ok(())
     }
 
+    // Tests for file-backed streaming
+
+    #[test]
+    fn test_file_source_streaming() -> Result<()> {
+        use tempfile::NamedTempFile;
+
+        // Test 1: Basic file reads work
+        let test_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
+        let mut tmp_file = NamedTempFile::new()?;
+        tmp_file.write_all(&test_data)?;
+        tmp_file.flush()?;
+
+        let file_path = tmp_file.path().to_path_buf();
+        let file = File::open(&file_path)?;
+        let mut source = FileSource::new(file, 0, file_path)?;
+
+        // Read full data
+        let data = source.read_batch_data(0, 10)?;
+        assert_eq!(data.to_vec(), test_data);
+
+        // Read partial data
+        let partial = source.read_batch_data(2, 5)?;
+        assert_eq!(partial.to_vec(), vec![3, 4, 5, 6, 7]);
+
+        // Test 2: base_offset works (critical for remote logs with 
pos_in_log_segment)
+        let prefix = vec![0xFF; 100];
+        let actual_data = vec![1, 2, 3, 4, 5];
+        let mut tmp_file2 = NamedTempFile::new()?;
+        tmp_file2.write_all(&prefix)?;
+        tmp_file2.write_all(&actual_data)?;
+        tmp_file2.flush()?;
+
+        let file_path2 = tmp_file2.path().to_path_buf();
+        let file2 = File::open(&file_path2)?;
+        let mut source2 = FileSource::new(file2, 100, file_path2)?; // Skip 
first 100 bytes
+
+        assert_eq!(source2.total_size(), 5); // Only counts data after offset
+        let data2 = source2.read_batch_data(0, 5)?;
+        assert_eq!(data2.to_vec(), actual_data);
+
+        Ok(())
+    }
+
     #[test]
     fn test_all_types_end_to_end() -> Result<()> {
         use crate::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz, 
TimestampNtz};
@@ -1590,4 +1911,63 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_log_records_batches_from_file() -> Result<()> {
+        use crate::client::WriteRecord;
+        use crate::compression::{
+            ArrowCompressionInfo, ArrowCompressionType, 
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+        };
+        use crate::metadata::TablePath;
+        use crate::row::GenericRow;
+        use tempfile::NamedTempFile;
+
+        // Integration test: Real log record batch streamed from file
+        let row_type = RowType::new(vec![
+            DataField::new("id".to_string(), DataTypes::int(), None),
+            DataField::new("name".to_string(), DataTypes::string(), None),
+        ]);
+        let table_path = Arc::new(TablePath::new("db".to_string(), 
"tbl".to_string()));
+
+        let mut builder = MemoryLogRecordsArrowBuilder::new(
+            1,
+            &row_type,
+            false,
+            ArrowCompressionInfo {
+                compression_type: ArrowCompressionType::None,
+                compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
+            },
+        )?;
+
+        let mut row = GenericRow::new();
+        row.set_field(0, 1_i32);
+        row.set_field(1, "alice");
+        let record = WriteRecord::for_append(table_path.clone(), 1, row);
+        builder.append(&record)?;
+
+        let mut row2 = GenericRow::new();
+        row2.set_field(0, 2_i32);
+        row2.set_field(1, "bob");
+        let record2 = WriteRecord::for_append(table_path, 2, row2);
+        builder.append(&record2)?;
+
+        let data = builder.build()?;
+
+        // Write to file
+        let mut tmp_file = NamedTempFile::new()?;
+        tmp_file.write_all(&data)?;
+        tmp_file.flush()?;
+
+        // Create file-backed LogRecordsBatches (should stream, not load all 
into memory)
+        let file_path = tmp_file.path().to_path_buf();
+        let file = File::open(&file_path)?;
+        let mut batches = LogRecordsBatches::from_file(file, 0, file_path)?;
+
+        // Iterate through batches (should work just like in-memory)
+        let batch = batches.next().expect("Should have at least one batch")?;
+        assert!(batch.size_in_bytes() > 0);
+        assert_eq!(batch.record_count(), 2);
+
+        Ok(())
+    }
 }
diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs
index 30424e5..156ef04 100644
--- a/crates/fluss/src/util/mod.rs
+++ b/crates/fluss/src/util/mod.rs
@@ -22,7 +22,6 @@ use crate::metadata::TableBucket;
 use linked_hash_map::LinkedHashMap;
 use std::collections::{HashMap, HashSet};
 use std::hash::Hash;
-use std::path::PathBuf;
 use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 
@@ -33,11 +32,9 @@ pub fn current_time_ms() -> i64 {
         .as_millis() as i64
 }
 
-pub async fn delete_file(file_path: PathBuf) {
-    tokio::fs::remove_file(&file_path)
-        .await
-        .unwrap_or_else(|err| log::warn!("Could not delete file: 
{file_path:?}, error: {err:?}"));
-}
+// Removed: delete_file() is no longer used.
+// File cleanup is now handled via RAII with FileCleanupGuard in arrow.rs
+// which uses Rust's drop order to ensure files are closed before deletion.
 
 pub struct FairBucketStatusMap<S> {
     map: LinkedHashMap<TableBucket, Arc<S>>,

Reply via email to