fresh-borzoni commented on code in PR #187:
URL: https://github.com/apache/fluss-rust/pull/187#discussion_r2714700656
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -373,53 +421,321 @@ pub trait ToArrow {
fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
}
-pub struct LogRecordsBatches {
+/// Abstract source of log record data.
+/// Allows streaming from files or in-memory buffers.
+pub trait LogRecordsSource: Send + Sync {
+ /// Read batch header at given position.
+ /// Returns (base_offset, batch_size) tuple.
+ fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)>;
+
+ /// Read full batch data at given position with given size.
+ /// Returns Bytes that can be zero-copy sliced.
+ fn read_batch_data(&self, pos: usize, size: usize) -> Result<Bytes>;
+
+ /// Total size of the source in bytes.
+ fn total_size(&self) -> usize;
+}
+
+/// In-memory implementation of LogRecordsSource.
+/// Used for local tablet server fetches (existing path).
+pub struct MemorySource {
data: Bytes,
+}
+
+impl MemorySource {
+ pub fn new(data: Vec<u8>) -> Self {
+ Self {
+ data: Bytes::from(data),
+ }
+ }
+}
+
+impl LogRecordsSource for MemorySource {
+ fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)> {
+ if pos + LOG_OVERHEAD > self.data.len() {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Position {} + LOG_OVERHEAD {} exceeds data size {}",
+ pos,
+ LOG_OVERHEAD,
+ self.data.len()
+ ),
+ source: None,
+ });
+ }
+
+ let base_offset = LittleEndian::read_i64(&self.data[pos +
BASE_OFFSET_OFFSET..]);
+ let batch_size_bytes = LittleEndian::read_i32(&self.data[pos +
LENGTH_OFFSET..]);
+
+ // Validate batch size to prevent integer overflow and corruption
+ let batch_size = validate_batch_size(batch_size_bytes)?;
+
+ Ok((base_offset, batch_size))
+ }
+
+ fn read_batch_data(&self, pos: usize, size: usize) -> Result<Bytes> {
+ if pos + size > self.data.len() {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Read beyond data size: {} + {} > {}",
+ pos,
+ size,
+ self.data.len()
+ ),
+ source: None,
+ });
+ }
+ // Zero-copy slice (Bytes is Arc-based)
+ Ok(self.data.slice(pos..pos + size))
+ }
+
+ fn total_size(&self) -> usize {
+ self.data.len()
+ }
+}
+
+/// RAII guard that deletes a file when dropped.
+/// Used to ensure file deletion happens AFTER the file handle is closed.
+struct FileCleanupGuard {
+ file_path: std::path::PathBuf,
+}
+
+impl Drop for FileCleanupGuard {
+ fn drop(&mut self) {
+ // File handle is already closed (this guard drops after the file
field)
+ if let Err(e) = std::fs::remove_file(&self.file_path) {
+ log::warn!(
+ "Failed to delete remote log file {}: {}",
+ self.file_path.display(),
+ e
+ );
+ } else {
+ log::debug!("Deleted remote log file: {}",
self.file_path.display());
+ }
+ }
+}
+
+/// File-backed implementation of LogRecordsSource.
+/// Used for remote log segments downloaded to local disk.
+/// Streams data on-demand instead of loading entire file into memory.
+///
+/// Uses Mutex<File> with seek + read_exact for cross-platform compatibility.
+/// Access pattern is sequential iteration (single consumer), so mutex
overhead is negligible.
+pub struct FileSource {
+ file: Mutex<std::fs::File>,
+ file_size: usize,
+ base_offset: usize,
+ _cleanup: Option<FileCleanupGuard>, // Drops AFTER file (field order
matters!)
+}
+
+impl FileSource {
+ /// Create a new FileSource without automatic cleanup.
+ pub fn new(file: std::fs::File, base_offset: usize) -> Result<Self> {
+ let file_size = file.metadata()?.len() as usize;
+
+ // Validate base_offset to prevent underflow in total_size()
+ if base_offset > file_size {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "base_offset ({}) exceeds file_size ({})",
+ base_offset, file_size
+ ),
+ source: None,
+ });
+ }
+
+ Ok(Self {
+ file: Mutex::new(file),
+ file_size,
+ base_offset,
+ _cleanup: None,
+ })
+ }
+
+ /// Create a new FileSource that will delete the file when dropped.
+ /// This is used for remote log files that need cleanup.
+ pub fn new_with_cleanup(
+ file: std::fs::File,
+ base_offset: usize,
+ file_path: std::path::PathBuf,
+ ) -> Result<Self> {
+ let file_size = file.metadata()?.len() as usize;
+
+ // Validate base_offset to prevent underflow in total_size()
+ if base_offset > file_size {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "base_offset ({}) exceeds file_size ({})",
+ base_offset, file_size
+ ),
+ source: None,
+ });
+ }
+
+ Ok(Self {
+ file: Mutex::new(file),
+ file_size,
+ base_offset,
+ _cleanup: Some(FileCleanupGuard { file_path }),
+ })
+ }
+
+ /// Read data at a specific position using seek + read_exact.
+ /// This is cross-platform and adequate for sequential access patterns.
+ fn read_at(&self, pos: u64, buf: &mut [u8]) -> Result<()> {
+ use std::io::Read;
+ let mut file = self.file.lock();
+ file.seek(SeekFrom::Start(pos))?;
+ file.read_exact(buf)?;
+ Ok(())
+ }
+}
+
+impl LogRecordsSource for FileSource {
+ fn read_batch_header(&self, pos: usize) -> Result<(i64, usize)> {
+ let actual_pos = self.base_offset + pos;
+ if actual_pos + LOG_OVERHEAD > self.file_size {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Position {} exceeds file size {}",
+ actual_pos, self.file_size
+ ),
+ source: None,
+ });
+ }
+
+ // Read only the header to extract base_offset and batch_size
+ let mut header_buf = vec![0u8; LOG_OVERHEAD];
+ self.read_at(actual_pos as u64, &mut header_buf)?;
+
+ let base_offset =
LittleEndian::read_i64(&header_buf[BASE_OFFSET_OFFSET..]);
+ let batch_size_bytes =
LittleEndian::read_i32(&header_buf[LENGTH_OFFSET..]);
+
+ // Validate batch size to prevent integer overflow and corruption
+ let batch_size = validate_batch_size(batch_size_bytes)?;
+
+ Ok((base_offset, batch_size))
+ }
+
+ fn read_batch_data(&self, pos: usize, size: usize) -> Result<Bytes> {
+ let actual_pos = self.base_offset + pos;
+ if actual_pos + size > self.file_size {
+ return Err(Error::UnexpectedError {
+ message: format!(
+ "Read beyond file size: {} + {} > {}",
+ actual_pos, size, self.file_size
+ ),
+ source: None,
+ });
+ }
+
+ // Read the full batch data
+ let mut batch_buf = vec![0u8; size];
+ self.read_at(actual_pos as u64, &mut batch_buf)?;
+
+ Ok(Bytes::from(batch_buf))
+ }
+
+ fn total_size(&self) -> usize {
+ self.file_size - self.base_offset
+ }
+}
+
+pub struct LogRecordsBatches {
+ source: Box<dyn LogRecordsSource>,
current_pos: usize,
remaining_bytes: usize,
}
impl LogRecordsBatches {
+ /// Create from in-memory Vec (existing path - backward compatible).
pub fn new(data: Vec<u8>) -> Self {
- let remaining_bytes: usize = data.len();
+ let remaining_bytes = data.len();
Self {
- data: Bytes::from(data),
+ source: Box::new(MemorySource::new(data)),
Review Comment:
Sure 👍
I guess there is no specific reason, just modelling artefact.
Another way to model it - through enum and use enum dispatch, it would
avoid vtable call, but runtime is still dominated by other things, so probably
it would be negligibly comparable.
I'll change to Enum change as I was still sloppy with imports
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]