luoyuxia commented on code in PR #187:
URL: https://github.com/apache/fluss-rust/pull/187#discussion_r2724235343


##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -172,40 +695,119 @@ impl RemoteLogDownloadFuture {
         self.result.lock().is_some()
     }
 
-    /// Get the downloaded file path (synchronous, only works after is_done() 
returns true)
-    pub fn get_remote_log_bytes(&self) -> Result<Vec<u8>> {
-        // todo: handle download fail
-        let guard = self.result.lock();
-        match guard.as_ref() {
-            Some(Ok(path)) => Ok(path.clone()),
-            Some(Err(e)) => Err(Error::IoUnexpectedError {
-                message: format!("Fail to get remote log bytes: {e}"),
-                source: io::Error::other(format!("{e:?}")),
-            }),
+    /// Take the RemoteLogFile (including the permit) from this future
+    /// This should only be called when the download is complete
+    /// This is the correct way to consume the download - it transfers permit 
ownership
+    pub fn take_remote_log_file(&self) -> Result<RemoteLogFile> {
+        let mut guard = self.result.lock();
+        match guard.take() {
+            Some(Ok(remote_log_file)) => Ok(remote_log_file),
+            Some(Err(e)) => {
+                let error_msg = format!("{e}");
+                Err(Error::IoUnexpectedError {
+                    message: format!("Fail to get remote log file: 
{error_msg}"),
+                    source: io::Error::other(error_msg),
+                })
+            }
             None => Err(Error::IoUnexpectedError {
-                message: "Get remote log bytes not completed yet".to_string(),
-                source: io::Error::other("Get remote log bytes not completed 
yet"),
+                message: "Remote log file already taken or not 
ready".to_string(),
+                source: io::Error::other("Remote log file already taken or not 
ready"),
             }),
         }
     }
 }
 
 /// Downloader for remote log segment files
 pub struct RemoteLogDownloader {
-    local_log_dir: TempDir,
-    remote_fs_props: RwLock<HashMap<String, String>>,
+    request_sender: Option<mpsc::UnboundedSender<RemoteLogDownloadRequest>>,
+    remote_fs_props: Option<Arc<RwLock<HashMap<String, String>>>>,
+    /// Handle to the coordinator task. Used for graceful shutdown via 
shutdown() method.
+    #[allow(dead_code)]
+    coordinator_handle: Option<tokio::task::JoinHandle<()>>,
 }
 
 impl RemoteLogDownloader {
-    pub fn new(local_log_dir: TempDir) -> Result<Self> {
+    pub fn new(

Review Comment:
   nit:
   can be:
   ```
   pub fn new(
           local_log_dir: TempDir,
           max_prefetch_segments: usize,
           max_concurrent_downloads: usize,
       ) -> Result<Self> {
           let remote_fs_props = Arc::new(RwLock::new(HashMap::new()));
           let fetcher = Arc::new(ProductionFetcher {
               remote_fs_props: remote_fs_props.clone(),
               local_log_dir: Arc::new(local_log_dir),
           });
   
           Self::new_with_fetcher(
               fetcher,
               max_prefetch_segments,
               max_concurrent_downloads,
           )
       }
   
       /// Create a RemoteLogDownloader with a custom fetcher (for testing)
       /// The remote_fs_props will be None since custom fetchers typically 
don't need S3 credentials
       pub fn new_with_fetcher(
           fetcher: Arc<dyn RemoteLogFetcher>,
           max_prefetch_segments: usize,
           max_concurrent_downloads: usize,
       ) -> Result<Self> {
           let (request_sender, request_receiver) = mpsc::unbounded_channel();
   
           let coordinator = DownloadCoordinator {
               download_queue: BinaryHeap::new(),
               active_downloads: JoinSet::new(),
               in_flight: 0,
               prefetch_semaphore: 
Arc::new(Semaphore::new(max_prefetch_segments)),
               max_concurrent_downloads,
               recycle_notify: Arc::new(Notify::new()),
               fetcher,
           };
   
           let coordinator_handle = tokio::spawn(coordinator_loop(coordinator, 
request_receiver));
   
           Ok(Self {
               request_sender: Some(request_sender),
               remote_fs_props: None,
               coordinator_handle: Some(coordinator_handle),
           })
       }
   ```
   ?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -441,53 +492,316 @@ pub trait ToArrow {
     fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
 }
 
-pub struct LogRecordsBatches {
+/// In-memory log record source.
+/// Used for local tablet server fetches (existing path).
+struct MemorySource {
     data: Bytes,
+}
+
+impl MemorySource {
+    fn new(data: Vec<u8>) -> Self {
+        Self {
+            data: Bytes::from(data),
+        }
+    }
+
+    fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
+        if pos + LOG_OVERHEAD > self.data.len() {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Position {} + LOG_OVERHEAD {} exceeds data size {}",
+                    pos,
+                    LOG_OVERHEAD,
+                    self.data.len()
+                ),
+                source: None,
+            });
+        }
+
+        let base_offset = LittleEndian::read_i64(&self.data[pos + 
BASE_OFFSET_OFFSET..]);
+        let batch_size_bytes = LittleEndian::read_i32(&self.data[pos + 
LENGTH_OFFSET..]);
+
+        // Validate batch size to prevent integer overflow and corruption
+        let batch_size = validate_batch_size(batch_size_bytes)?;
+
+        Ok((base_offset, batch_size))
+    }
+
+    fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
+        if pos + size > self.data.len() {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Read beyond data size: {} + {} > {}",
+                    pos,
+                    size,
+                    self.data.len()
+                ),
+                source: None,
+            });
+        }
+        // Zero-copy slice (Bytes is Arc-based)
+        Ok(self.data.slice(pos..pos + size))
+    }
+
+    fn total_size(&self) -> usize {
+        self.data.len()
+    }
+}
+
+/// RAII guard that deletes a file when dropped.
+/// Used to ensure file deletion happens AFTER the file handle is closed.
+struct FileCleanupGuard {
+    file_path: PathBuf,
+}
+
+impl Drop for FileCleanupGuard {
+    fn drop(&mut self) {
+        // File handle is already closed (this guard drops after the file 
field)
+        if let Err(e) = std::fs::remove_file(&self.file_path) {
+            log::warn!(
+                "Failed to delete remote log file {}: {}",
+                self.file_path.display(),
+                e
+            );
+        } else {
+            log::debug!("Deleted remote log file: {}", 
self.file_path.display());
+        }
+    }
+}
+
+/// File-backed log record source.
+/// Used for remote log segments downloaded to local disk.
+/// Streams data on-demand instead of loading entire file into memory.
+///
+/// Uses seek + read_exact for cross-platform compatibility.
+/// Access pattern is sequential iteration (single consumer).
+struct FileSource {
+    file: File,
+    file_size: usize,
+    base_offset: usize,
+    _cleanup: Option<FileCleanupGuard>, // Drops AFTER file (field order 
matters!)
+}
+
+impl FileSource {
+    /// Create a new FileSource without automatic cleanup.
+    pub fn new(file: File, base_offset: usize) -> Result<Self> {

Review Comment:
   nit:
   Seems `new` is only called in testing. So, can we delete this method 
   and rename `new_with_cleanup` to `new`?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -441,53 +492,316 @@ pub trait ToArrow {
     fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
 }
 
-pub struct LogRecordsBatches {
+/// In-memory log record source.
+/// Used for local tablet server fetches (existing path).
+struct MemorySource {
     data: Bytes,
+}
+
+impl MemorySource {
+    fn new(data: Vec<u8>) -> Self {
+        Self {
+            data: Bytes::from(data),
+        }
+    }
+
+    fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
+        if pos + LOG_OVERHEAD > self.data.len() {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Position {} + LOG_OVERHEAD {} exceeds data size {}",
+                    pos,
+                    LOG_OVERHEAD,
+                    self.data.len()
+                ),
+                source: None,
+            });
+        }
+
+        let base_offset = LittleEndian::read_i64(&self.data[pos + 
BASE_OFFSET_OFFSET..]);
+        let batch_size_bytes = LittleEndian::read_i32(&self.data[pos + 
LENGTH_OFFSET..]);
+
+        // Validate batch size to prevent integer overflow and corruption
+        let batch_size = validate_batch_size(batch_size_bytes)?;
+
+        Ok((base_offset, batch_size))
+    }
+
+    fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
+        if pos + size > self.data.len() {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Read beyond data size: {} + {} > {}",
+                    pos,
+                    size,
+                    self.data.len()
+                ),
+                source: None,
+            });
+        }
+        // Zero-copy slice (Bytes is Arc-based)
+        Ok(self.data.slice(pos..pos + size))
+    }
+
+    fn total_size(&self) -> usize {
+        self.data.len()
+    }
+}
+
+/// RAII guard that deletes a file when dropped.
+/// Used to ensure file deletion happens AFTER the file handle is closed.
+struct FileCleanupGuard {
+    file_path: PathBuf,
+}
+
+impl Drop for FileCleanupGuard {
+    fn drop(&mut self) {
+        // File handle is already closed (this guard drops after the file 
field)
+        if let Err(e) = std::fs::remove_file(&self.file_path) {
+            log::warn!(
+                "Failed to delete remote log file {}: {}",
+                self.file_path.display(),
+                e
+            );
+        } else {
+            log::debug!("Deleted remote log file: {}", 
self.file_path.display());
+        }
+    }
+}
+
+/// File-backed log record source.
+/// Used for remote log segments downloaded to local disk.
+/// Streams data on-demand instead of loading entire file into memory.
+///
+/// Uses seek + read_exact for cross-platform compatibility.
+/// Access pattern is sequential iteration (single consumer).
+struct FileSource {
+    file: File,
+    file_size: usize,
+    base_offset: usize,
+    _cleanup: Option<FileCleanupGuard>, // Drops AFTER file (field order 
matters!)
+}
+
+impl FileSource {
+    /// Create a new FileSource without automatic cleanup.
+    pub fn new(file: File, base_offset: usize) -> Result<Self> {
+        let file_size = file.metadata()?.len() as usize;
+
+        // Validate base_offset to prevent underflow in total_size()
+        if base_offset > file_size {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "base_offset ({}) exceeds file_size ({})",
+                    base_offset, file_size
+                ),
+                source: None,
+            });
+        }
+
+        Ok(Self {
+            file,
+            file_size,
+            base_offset,
+            _cleanup: None,
+        })
+    }
+
+    /// Create a new FileSource that will delete the file when dropped.
+    /// This is used for remote log files that need cleanup.
+    fn new_with_cleanup(file: File, base_offset: usize, file_path: PathBuf) -> 
Result<Self> {
+        let file_size = file.metadata()?.len() as usize;
+
+        // Validate base_offset to prevent underflow in total_size()
+        if base_offset > file_size {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "base_offset ({}) exceeds file_size ({})",
+                    base_offset, file_size
+                ),
+                source: None,
+            });
+        }
+
+        Ok(Self {
+            file,
+            file_size,
+            base_offset,
+            _cleanup: Some(FileCleanupGuard { file_path }),
+        })
+    }
+
+    /// Read data at a specific position using seek + read_exact.
+    /// This is cross-platform and adequate for sequential access patterns.
+    fn read_at(&mut self, pos: u64, buf: &mut [u8]) -> Result<()> {
+        self.file.seek(SeekFrom::Start(pos))?;
+        self.file.read_exact(buf)?;
+        Ok(())
+    }
+
+    fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
+        let actual_pos = self.base_offset + pos;
+        if actual_pos + LOG_OVERHEAD > self.file_size {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Position {} exceeds file size {}",
+                    actual_pos, self.file_size
+                ),
+                source: None,
+            });
+        }
+
+        // Read only the header to extract base_offset and batch_size
+        let mut header_buf = vec![0u8; LOG_OVERHEAD];
+        self.read_at(actual_pos as u64, &mut header_buf)?;
+
+        let base_offset = 
LittleEndian::read_i64(&header_buf[BASE_OFFSET_OFFSET..]);
+        let batch_size_bytes = 
LittleEndian::read_i32(&header_buf[LENGTH_OFFSET..]);
+
+        // Validate batch size to prevent integer overflow and corruption
+        let batch_size = validate_batch_size(batch_size_bytes)?;
+
+        Ok((base_offset, batch_size))
+    }
+
+    fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
+        let actual_pos = self.base_offset + pos;
+        if actual_pos + size > self.file_size {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Read beyond file size: {} + {} > {}",
+                    actual_pos, size, self.file_size
+                ),
+                source: None,
+            });
+        }
+
+        // Read the full batch data
+        let mut batch_buf = vec![0u8; size];
+        self.read_at(actual_pos as u64, &mut batch_buf)?;
+
+        Ok(Bytes::from(batch_buf))
+    }
+
+    fn total_size(&self) -> usize {
+        self.file_size - self.base_offset
+    }
+}
+
+/// Enum for different log record sources.
+enum LogRecordsSource {
+    Memory(MemorySource),
+    File(FileSource),
+}
+
+impl LogRecordsSource {
+    fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
+        match self {
+            Self::Memory(s) => s.read_batch_header(pos),
+            Self::File(s) => s.read_batch_header(pos),
+        }
+    }
+
+    fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
+        match self {
+            Self::Memory(s) => s.read_batch_data(pos, size),
+            Self::File(s) => s.read_batch_data(pos, size),
+        }
+    }
+
+    fn total_size(&self) -> usize {
+        match self {
+            Self::Memory(s) => s.total_size(),
+            Self::File(s) => s.total_size(),
+        }
+    }
+}
+
+pub struct LogRecordsBatches {
+    source: LogRecordsSource,
     current_pos: usize,
     remaining_bytes: usize,
 }
 
 impl LogRecordsBatches {
+    /// Create from in-memory Vec (existing path - backward compatible).
     pub fn new(data: Vec<u8>) -> Self {
-        let remaining_bytes: usize = data.len();
+        let source = LogRecordsSource::Memory(MemorySource::new(data));
+        let remaining_bytes = source.total_size();
         Self {
-            data: Bytes::from(data),
+            source,
             current_pos: 0,
             remaining_bytes,
         }
     }
 
-    pub fn next_batch_size(&self) -> Option<usize> {
+    /// Create from file.
+    /// Enables streaming without loading entire file into memory.
+    pub fn from_file(file: File, base_offset: usize) -> Result<Self> {

Review Comment:
   nit:
   delete `from_file` and rename `from_file_with_cleanup` to 
`from_file_with_cleanup`?



##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -172,40 +695,119 @@ impl RemoteLogDownloadFuture {
         self.result.lock().is_some()
     }
 
-    /// Get the downloaded file path (synchronous, only works after is_done() 
returns true)
-    pub fn get_remote_log_bytes(&self) -> Result<Vec<u8>> {
-        // todo: handle download fail
-        let guard = self.result.lock();
-        match guard.as_ref() {
-            Some(Ok(path)) => Ok(path.clone()),
-            Some(Err(e)) => Err(Error::IoUnexpectedError {
-                message: format!("Fail to get remote log bytes: {e}"),
-                source: io::Error::other(format!("{e:?}")),
-            }),
+    /// Take the RemoteLogFile (including the permit) from this future
+    /// This should only be called when the download is complete
+    /// This is the correct way to consume the download - it transfers permit 
ownership
+    pub fn take_remote_log_file(&self) -> Result<RemoteLogFile> {
+        let mut guard = self.result.lock();
+        match guard.take() {
+            Some(Ok(remote_log_file)) => Ok(remote_log_file),
+            Some(Err(e)) => {
+                let error_msg = format!("{e}");
+                Err(Error::IoUnexpectedError {
+                    message: format!("Fail to get remote log file: 
{error_msg}"),
+                    source: io::Error::other(error_msg),
+                })
+            }
             None => Err(Error::IoUnexpectedError {
-                message: "Get remote log bytes not completed yet".to_string(),
-                source: io::Error::other("Get remote log bytes not completed 
yet"),
+                message: "Remote log file already taken or not 
ready".to_string(),
+                source: io::Error::other("Remote log file already taken or not 
ready"),
             }),
         }
     }
 }
 
 /// Downloader for remote log segment files
 pub struct RemoteLogDownloader {
-    local_log_dir: TempDir,
-    remote_fs_props: RwLock<HashMap<String, String>>,
+    request_sender: Option<mpsc::UnboundedSender<RemoteLogDownloadRequest>>,
+    remote_fs_props: Option<Arc<RwLock<HashMap<String, String>>>>,
+    /// Handle to the coordinator task. Used for graceful shutdown via 
shutdown() method.
+    #[allow(dead_code)]
+    coordinator_handle: Option<tokio::task::JoinHandle<()>>,

Review Comment:
   seem it's also not used, is it expected?



##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -172,40 +695,119 @@ impl RemoteLogDownloadFuture {
         self.result.lock().is_some()
     }
 
-    /// Get the downloaded file path (synchronous, only works after is_done() 
returns true)
-    pub fn get_remote_log_bytes(&self) -> Result<Vec<u8>> {
-        // todo: handle download fail
-        let guard = self.result.lock();
-        match guard.as_ref() {
-            Some(Ok(path)) => Ok(path.clone()),
-            Some(Err(e)) => Err(Error::IoUnexpectedError {
-                message: format!("Fail to get remote log bytes: {e}"),
-                source: io::Error::other(format!("{e:?}")),
-            }),
+    /// Take the RemoteLogFile (including the permit) from this future
+    /// This should only be called when the download is complete
+    /// This is the correct way to consume the download - it transfers permit 
ownership
+    pub fn take_remote_log_file(&self) -> Result<RemoteLogFile> {
+        let mut guard = self.result.lock();
+        match guard.take() {
+            Some(Ok(remote_log_file)) => Ok(remote_log_file),
+            Some(Err(e)) => {
+                let error_msg = format!("{e}");
+                Err(Error::IoUnexpectedError {
+                    message: format!("Fail to get remote log file: 
{error_msg}"),
+                    source: io::Error::other(error_msg),
+                })
+            }
             None => Err(Error::IoUnexpectedError {
-                message: "Get remote log bytes not completed yet".to_string(),
-                source: io::Error::other("Get remote log bytes not completed 
yet"),
+                message: "Remote log file already taken or not 
ready".to_string(),
+                source: io::Error::other("Remote log file already taken or not 
ready"),
             }),
         }
     }
 }
 
 /// Downloader for remote log segment files
 pub struct RemoteLogDownloader {
-    local_log_dir: TempDir,
-    remote_fs_props: RwLock<HashMap<String, String>>,
+    request_sender: Option<mpsc::UnboundedSender<RemoteLogDownloadRequest>>,
+    remote_fs_props: Option<Arc<RwLock<HashMap<String, String>>>>,
+    /// Handle to the coordinator task. Used for graceful shutdown via 
shutdown() method.
+    #[allow(dead_code)]
+    coordinator_handle: Option<tokio::task::JoinHandle<()>>,
 }
 
 impl RemoteLogDownloader {
-    pub fn new(local_log_dir: TempDir) -> Result<Self> {
+    pub fn new(
+        local_log_dir: TempDir,
+        max_prefetch_segments: usize,
+        max_concurrent_downloads: usize,
+    ) -> Result<Self> {
+        let remote_fs_props = Arc::new(RwLock::new(HashMap::new()));
+        let fetcher = Arc::new(ProductionFetcher {
+            remote_fs_props: remote_fs_props.clone(),
+            local_log_dir: Arc::new(local_log_dir),
+        });
+
+        let (request_sender, request_receiver) = mpsc::unbounded_channel();
+
+        let coordinator = DownloadCoordinator {
+            download_queue: BinaryHeap::new(),
+            active_downloads: JoinSet::new(),
+            in_flight: 0,
+            prefetch_semaphore: 
Arc::new(Semaphore::new(max_prefetch_segments)),
+            max_concurrent_downloads,
+            recycle_notify: Arc::new(Notify::new()),
+            fetcher,
+        };
+
+        let coordinator_handle = tokio::spawn(coordinator_loop(coordinator, 
request_receiver));
+
         Ok(Self {
-            local_log_dir,
-            remote_fs_props: RwLock::new(HashMap::new()),
+            request_sender: Some(request_sender),
+            remote_fs_props: Some(remote_fs_props),
+            coordinator_handle: Some(coordinator_handle),
         })
     }
 
+    /// Create a RemoteLogDownloader with a custom fetcher (for testing)
+    /// The remote_fs_props will be None since custom fetchers typically don't 
need S3 credentials
+    #[cfg(test)]
+    pub fn new_with_fetcher(
+        fetcher: Arc<dyn RemoteLogFetcher>,
+        max_prefetch_segments: usize,
+        max_concurrent_downloads: usize,
+    ) -> Result<Self> {
+        let (request_sender, request_receiver) = mpsc::unbounded_channel();
+
+        let coordinator = DownloadCoordinator {
+            download_queue: BinaryHeap::new(),
+            active_downloads: JoinSet::new(),
+            in_flight: 0,
+            prefetch_semaphore: 
Arc::new(Semaphore::new(max_prefetch_segments)),
+            max_concurrent_downloads,
+            recycle_notify: Arc::new(Notify::new()),
+            fetcher,
+        };
+
+        let coordinator_handle = tokio::spawn(coordinator_loop(coordinator, 
request_receiver));
+
+        Ok(Self {
+            request_sender: Some(request_sender),
+            remote_fs_props: None,
+            coordinator_handle: Some(coordinator_handle),
+        })
+    }
+
+    /// Gracefully shutdown the downloader
+    /// Closes the request channel and waits for coordinator to finish pending 
work
+    ///
+    /// Note: This consumes self to prevent use-after-shutdown
+    #[allow(dead_code)]
+    pub async fn shutdown(mut self) {

Review Comment:
   seems this method is not called? Is it expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to