zhaohaidao commented on code in PR #187:
URL: https://github.com/apache/fluss-rust/pull/187#discussion_r2711129145


##########
crates/fluss/src/record/arrow.rs:
##########
@@ -373,17 +421,253 @@ pub trait ToArrow {
     fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
 }
 
-pub struct LogRecordsBatches {
+/// Abstract source of log record data.
+/// Allows streaming from files or in-memory buffers.
+pub trait LogRecordsSource: Send + Sync {
+    /// Read batch header at given position.
+    /// Returns (base_offset, batch_size) tuple.
+    fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)>;
+
+    /// Read full batch data at given position with given size.
+    /// Returns Bytes that can be zero-copy sliced.
+    fn read_batch_data(&self, pos: usize, size: usize) -> Result<Bytes>;
+
+    /// Total size of the source in bytes.
+    fn total_size(&self) -> usize;
+}
+
+/// In-memory implementation of LogRecordsSource.
+/// Used for local tablet server fetches (existing path).
+pub struct MemorySource {
     data: Bytes,
+}
+
+impl MemorySource {
+    pub fn new(data: Vec<u8>) -> Self {
+        Self {
+            data: Bytes::from(data),
+        }
+    }
+}
+
+impl LogRecordsSource for MemorySource {
+    fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)> {
+        if pos + LOG_OVERHEAD > self.data.len() {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Position {} + LOG_OVERHEAD {} exceeds data size {}",
+                    pos,
+                    LOG_OVERHEAD,
+                    self.data.len()
+                ),
+                source: None,
+            });
+        }
+
+        let base_offset = LittleEndian::read_i64(&self.data[pos + 
BASE_OFFSET_OFFSET..]);
+        let batch_size_bytes = LittleEndian::read_i32(&self.data[pos + 
LENGTH_OFFSET..]);
+
+        // Validate batch size to prevent integer overflow and corruption
+        let batch_size = validate_batch_size(batch_size_bytes)?;
+
+        Ok((base_offset, batch_size))
+    }
+
+    fn read_batch_data(&self, pos: usize, size: usize) -> Result<Bytes> {
+        if pos + size > self.data.len() {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Read beyond data size: {} + {} > {}",
+                    pos,
+                    size,
+                    self.data.len()
+                ),
+                source: None,
+            });
+        }
+        // Zero-copy slice (Bytes is Arc-based)
+        Ok(self.data.slice(pos..pos + size))
+    }
+
+    fn total_size(&self) -> usize {
+        self.data.len()
+    }
+}
+
+/// RAII guard that deletes a file when dropped.
+/// Used to ensure file deletion happens AFTER the file handle is closed.
+struct FileCleanupGuard {
+    file_path: std::path::PathBuf,
+}
+
+impl Drop for FileCleanupGuard {
+    fn drop(&mut self) {
+        // File handle is already closed (this guard drops after the file 
field)
+        if let Err(e) = std::fs::remove_file(&self.file_path) {
+            log::warn!(
+                "Failed to delete remote log file {}: {}",
+                self.file_path.display(),
+                e
+            );
+        } else {
+            log::debug!("Deleted remote log file: {}", 
self.file_path.display());
+        }
+    }
+}
+
+/// File-backed implementation of LogRecordsSource.
+/// Used for remote log segments downloaded to local disk.
+/// Streams data on-demand instead of loading entire file into memory.
+///
+/// Uses Mutex<File> with seek + read_exact for cross-platform compatibility.
+/// Access pattern is sequential iteration (single consumer), so mutex 
overhead is negligible.
+pub struct FileSource {
+    file: Mutex<std::fs::File>,
+    file_size: usize,
+    base_offset: usize,
+    _cleanup: Option<FileCleanupGuard>, // Drops AFTER file (field order 
matters!)
+}
+
+impl FileSource {
+    /// Create a new FileSource without automatic cleanup.
+    pub fn new(file: std::fs::File, base_offset: usize) -> Result<Self> {
+        let file_size = file.metadata()?.len() as usize;
+        Ok(Self {
+            file: Mutex::new(file),
+            file_size,
+            base_offset,
+            _cleanup: None,
+        })
+    }
+
+    /// Create a new FileSource that will delete the file when dropped.
+    /// This is used for remote log files that need cleanup.
+    pub fn new_with_cleanup(
+        file: std::fs::File,
+        base_offset: usize,
+        file_path: std::path::PathBuf,
+    ) -> Result<Self> {
+        let file_size = file.metadata()?.len() as usize;
+        Ok(Self {
+            file: Mutex::new(file),
+            file_size,
+            base_offset,
+            _cleanup: Some(FileCleanupGuard { file_path }),
+        })
+    }
+
+    /// Read data at a specific position using seek + read_exact.
+    /// This is cross-platform and adequate for sequential access patterns.
+    fn read_at(&self, pos: u64, buf: &mut [u8]) -> Result<()> {
+        use std::io::Read;
+        let mut file = self.file.lock();
+        file.seek(SeekFrom::Start(pos))?;
+        file.read_exact(buf)?;
+        Ok(())
+    }
+}
+
+impl LogRecordsSource for FileSource {
+    fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)> {
+        let actual_pos = self.base_offset + pos;
+        if actual_pos + LOG_OVERHEAD > self.file_size {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Position {} exceeds file size {}",
+                    actual_pos, self.file_size
+                ),
+                source: None,
+            });
+        }
+
+        // Read only the header to extract base_offset and batch_size
+        let mut header_buf = vec![0u8; LOG_OVERHEAD];
+        self.read_at(actual_pos as u64, &mut header_buf)?;
+
+        let base_offset = 
LittleEndian::read_i64(&header_buf[BASE_OFFSET_OFFSET..]);
+        let batch_size_bytes = 
LittleEndian::read_i32(&header_buf[LENGTH_OFFSET..]);
+
+        // Validate batch size to prevent integer overflow and corruption
+        let batch_size = validate_batch_size(batch_size_bytes)?;
+
+        Ok((base_offset, batch_size))
+    }
+
+    fn read_batch_data(&self, pos: usize, size: usize) -> Result<Bytes> {
+        let actual_pos = self.base_offset + pos;
+        if actual_pos + size > self.file_size {
+            return Err(Error::UnexpectedError {
+                message: format!(
+                    "Read beyond file size: {} + {} > {}",
+                    actual_pos, size, self.file_size
+                ),
+                source: None,
+            });
+        }
+
+        // Read the full batch data
+        let mut batch_buf = vec![0u8; size];
+        self.read_at(actual_pos as u64, &mut batch_buf)?;
+
+        Ok(Bytes::from(batch_buf))
+    }
+
+    fn total_size(&self) -> usize {
+        self.file_size - self.base_offset

Review Comment:
   
   Can we assume that `file_size` is always greater than or equal to 
`base_offset`?
   If not, what are the consequences of `total_size` returning a negative value?
   If so, can we assert `base_offset <= file_size` in the constructor?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -410,14 +705,24 @@ impl Iterator for LogRecordsBatches {
     fn next(&mut self) -> Option<Self::Item> {
         match self.next_batch_size() {
             Some(batch_size) => {
-                let start = self.current_pos;
-                let end = start + batch_size;
-                // Since LogRecordsBatches owns the Vec<u8>, the slice is valid
-                // as long as the mutable reference exists, which is 'a
-                let record_batch = 
LogRecordBatch::new(self.data.slice(start..end));
-                self.current_pos += batch_size;
-                self.remaining_bytes -= batch_size;
-                Some(record_batch)
+                // Read full batch data on-demand
+                match self.source.read_batch_data(self.current_pos, 
batch_size) {
+                    Ok(data) => {
+                        let record_batch = LogRecordBatch::new(data);
+                        self.current_pos += batch_size;
+                        self.remaining_bytes -= batch_size;
+                        Some(record_batch)
+                    }
+                    Err(e) => {
+                        log::error!(
+                            "Failed to read batch data at pos {} size {}: {}",
+                            self.current_pos,
+                            batch_size,
+                            e
+                        );
+                        None

Review Comment:
   Data corruption or partial reads can be hidden without surfacing errors
   Can we propagate errors? e.g., Result-returning iterator or error state that 
is surfaced by CompletedFetch



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -394,13 +678,24 @@ impl LogRecordsBatches {
             return None;
         }
 
-        let batch_size_bytes =
-            LittleEndian::read_i32(self.data.get(self.current_pos + 
LENGTH_OFFSET..).unwrap());
-        let batch_size = batch_size_bytes as usize + LOG_OVERHEAD;
-        if batch_size > self.remaining_bytes {
-            return None;
+        // Read only header to get size (efficient!)
+        match self.source.read_batch_header(self.current_pos) {
+            Ok((_base_offset, batch_size)) => {
+                if batch_size > self.remaining_bytes {
+                    None
+                } else {
+                    Some(batch_size)
+                }
+            }
+            Err(e) => {
+                log::debug!(
+                    "Failed to read batch header at pos {}: {}",
+                    self.current_pos,
+                    e
+                );
+                None

Review Comment:
   Data corruption or partial reads can be hidden without surfacing errors
   Can we propagate errors? e.g., Result-returning iterator or error state that 
is surfaced by CompletedFetch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to