Copilot commented on code in PR #187:
URL: https://github.com/apache/fluss-rust/pull/187#discussion_r2710666666
##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -88,17 +146,473 @@ impl RemoteLogFetchInfo {
}
}
+/// RAII guard for prefetch permit that notifies coordinator on drop
+///
+/// NOTE: File deletion is now handled by FileSource::drop(), not here.
+/// This ensures the file is closed before deletion
+#[derive(Debug)]
+pub struct PrefetchPermit {
+ permit: Option<OwnedSemaphorePermit>,
+ recycle_notify: Arc<Notify>,
+}
+
+impl PrefetchPermit {
+ fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc<Notify>) -> Self {
+ Self {
+ permit: Some(permit),
+ recycle_notify,
+ }
+ }
+}
+
+impl Drop for PrefetchPermit {
+ fn drop(&mut self) {
+ // Release capacity (critical: permit must be dropped before notify)
+ let _ = self.permit.take(); // drops permit here
+
+ // Then wake coordinator so it can acquire the now-available permit
+ self.recycle_notify.notify_one();
+ }
+}
+
+/// Downloaded remote log file with prefetch permit
+/// File remains on disk for memory efficiency - permit cleanup deletes it
+#[derive(Debug)]
+pub struct RemoteLogFile {
+ /// Path to the downloaded file on local disk
+ pub file_path: PathBuf,
+ /// Size of the file in bytes
+ /// Currently unused but kept for potential future use (logging, metrics,
etc.)
+ #[allow(dead_code)]
+ pub file_size: usize,
+ /// RAII permit that will delete the file when dropped
+ pub permit: PrefetchPermit,
Review Comment:
The documentation comments are misleading. Line 179 states "permit cleanup
deletes it" and line 188 says "RAII permit that will delete the file when
dropped", but PrefetchPermit's Drop implementation only releases the semaphore
permit and notifies the coordinator - it doesn't delete the file. The actual
file deletion is handled by FileCleanupGuard in FileSource (arrow.rs), as noted
in the PrefetchPermit struct's documentation at lines 150-152. Update these
comments to accurately reflect that the permit manages the prefetch semaphore
and notification, not file deletion.
##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -644,6 +645,179 @@ impl CompletedFetch for DefaultCompletedFetch {
}
}
+/// Completed fetch for remote log segments
+/// Matches Java's RemoteCompletedFetch design - separate class for remote vs
local
+/// Holds RAII permit until consumed (data is in inner)
+pub struct RemoteCompletedFetch {
+ inner: DefaultCompletedFetch,
+ permit: Option<crate::client::table::remote_log::PrefetchPermit>,
+}
+
+impl RemoteCompletedFetch {
+ pub fn new(
+ inner: DefaultCompletedFetch,
+ permit: crate::client::table::remote_log::PrefetchPermit,
+ ) -> Self {
+ Self {
+ inner,
+ permit: Some(permit),
+ }
+ }
+}
+
+impl CompletedFetch for RemoteCompletedFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ self.inner.table_bucket()
+ }
+
+ fn api_error(&self) -> Option<&ApiError> {
+ self.inner.api_error()
+ }
+
+ fn fetch_error_context(&self) -> Option<&FetchErrorContext> {
+ self.inner.fetch_error_context()
+ }
+
+ fn take_error(&mut self) -> Option<Error> {
+ self.inner.take_error()
+ }
+
+ fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>
{
+ self.inner.fetch_records(max_records)
+ }
+
+ fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<RecordBatch>> {
+ self.inner.fetch_batches(max_batches)
+ }
+
+ fn is_consumed(&self) -> bool {
+ self.inner.is_consumed()
+ }
+
+ fn records_read(&self) -> usize {
+ self.inner.records_read()
+ }
+
+ fn drain(&mut self) {
+ self.inner.drain();
+ // Release permit immediately (don't wait for struct drop)
+ // Critical: allows prefetch to continue even if Box<dyn
CompletedFetch> kept around
+ self.permit.take(); // drops permit here, triggers recycle notification
+ }
+
+ fn size_in_bytes(&self) -> usize {
+ self.inner.size_in_bytes()
+ }
+
+ fn high_watermark(&self) -> i64 {
+ self.inner.high_watermark()
+ }
+
+ fn is_initialized(&self) -> bool {
+ self.inner.is_initialized()
+ }
+
+ fn set_initialized(&mut self) {
+ self.inner.set_initialized()
+ }
+
+ fn next_fetch_offset(&self) -> i64 {
+ self.inner.next_fetch_offset()
+ }
+}
+// Permit released explicitly in drain() or automatically when struct drops
+
+/// Pending fetch that waits for remote log file to be downloaded
+pub struct RemotePendingFetch {
+ segment: crate::client::table::remote_log::RemoteLogSegment,
+ download_future: crate::client::table::remote_log::RemoteLogDownloadFuture,
+ pos_in_log_segment: i32,
+ fetch_offset: i64,
+ high_watermark: i64,
+ read_context: ReadContext,
+}
+
+impl RemotePendingFetch {
+ pub fn new(
+ segment: crate::client::table::remote_log::RemoteLogSegment,
+ download_future:
crate::client::table::remote_log::RemoteLogDownloadFuture,
+ pos_in_log_segment: i32,
+ fetch_offset: i64,
+ high_watermark: i64,
+ read_context: ReadContext,
+ ) -> Self {
+ Self {
+ segment,
+ download_future,
+ pos_in_log_segment,
+ fetch_offset,
+ high_watermark,
+ read_context,
+ }
+ }
+}
+
+impl PendingFetch for RemotePendingFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ &self.segment.table_bucket
+ }
+
+ fn is_completed(&self) -> bool {
+ self.download_future.is_done()
+ }
+
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+ // Take the RemoteLogFile and destructure
+ let remote_log_file = self.download_future.take_remote_log_file()?;
+ let crate::client::table::remote_log::RemoteLogFile {
+ file_path,
+ file_size: _,
+ permit,
+ } = remote_log_file;
+
+ // Open file for streaming (no memory allocation for entire file)
+ let file = std::fs::File::open(&file_path)?;
+ let file_size = file.metadata()?.len() as usize;
+
+ // Create file-backed LogRecordsBatches with cleanup (streaming!)
+ // Data will be read batch-by-batch on-demand, not all at once
+ // FileSource will delete the file when dropped (after file is closed)
+ let log_record_batch = LogRecordsBatches::from_file_with_cleanup(
+ file,
+ self.pos_in_log_segment as usize,
+ file_path.clone(),
+ )?;
+
+ // Calculate size based on position offset
+ let size_in_bytes = if self.pos_in_log_segment > 0 {
+ let pos = self.pos_in_log_segment as usize;
+ if pos >= file_size {
+ return Err(Error::UnexpectedError {
+ message: format!("Position {} exceeds file size {}", pos,
file_size),
+ source: None,
+ });
+ }
+ file_size - pos
+ } else {
+ file_size
+ };
+
+ // Create DefaultCompletedFetch
+ let inner_fetch = DefaultCompletedFetch::new(
+ self.segment.table_bucket.clone(),
+ log_record_batch,
+ size_in_bytes,
+ self.read_context,
+ self.fetch_offset,
+ self.high_watermark,
+ );
+
+ // Wrap it with RemoteCompletedFetch to hold the permit
+ // Permit will delete the file when dropped
Review Comment:
The comment "Permit will delete the file when dropped" is misleading. The
PrefetchPermit only releases the semaphore and notifies the coordinator. The
actual file deletion is handled by FileCleanupGuard in the FileSource
(arrow.rs). Update this comment to reflect that the permit manages the prefetch
slot, not file deletion.
```suggestion
// Permit manages the prefetch slot (releases semaphore and notifies
coordinator) when dropped;
// file deletion is handled by the file-backed source created via
from_file_with_cleanup
```
##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -88,17 +146,473 @@ impl RemoteLogFetchInfo {
}
}
+/// RAII guard for prefetch permit that notifies coordinator on drop
+///
+/// NOTE: File deletion is now handled by FileSource::drop(), not here.
+/// This ensures the file is closed before deletion
+#[derive(Debug)]
+pub struct PrefetchPermit {
+ permit: Option<OwnedSemaphorePermit>,
+ recycle_notify: Arc<Notify>,
+}
+
+impl PrefetchPermit {
+ fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc<Notify>) -> Self {
+ Self {
+ permit: Some(permit),
+ recycle_notify,
+ }
+ }
+}
+
+impl Drop for PrefetchPermit {
+ fn drop(&mut self) {
+ // Release capacity (critical: permit must be dropped before notify)
+ let _ = self.permit.take(); // drops permit here
+
+ // Then wake coordinator so it can acquire the now-available permit
+ self.recycle_notify.notify_one();
+ }
+}
+
+/// Downloaded remote log file with prefetch permit
+/// File remains on disk for memory efficiency - permit cleanup deletes it
+#[derive(Debug)]
+pub struct RemoteLogFile {
+ /// Path to the downloaded file on local disk
+ pub file_path: PathBuf,
+ /// Size of the file in bytes
+ /// Currently unused but kept for potential future use (logging, metrics,
etc.)
+ #[allow(dead_code)]
+ pub file_size: usize,
+ /// RAII permit that will delete the file when dropped
+ pub permit: PrefetchPermit,
+}
+
+/// Represents a request to download a remote log segment with priority
ordering
+#[derive(Debug)]
+pub struct RemoteLogDownloadRequest {
+ segment: RemoteLogSegment,
+ remote_log_tablet_dir: String,
+ result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+ retry_count: u32,
+ next_retry_at: Option<tokio::time::Instant>,
+}
+
+impl RemoteLogDownloadRequest {
+ /// Get the segment (used by test fetcher implementations)
+ #[cfg(test)]
+ pub fn segment(&self) -> &RemoteLogSegment {
+ &self.segment
+ }
+}
+
+// Total ordering for priority queue (Rust requirement: cmp==Equal implies Eq)
+// Primary: Java semantics (timestamp cross-bucket, offset within-bucket)
+// Tie-breakers: table_bucket fields (table_id, partition_id, bucket_id), then
segment_id
+impl Ord for RemoteLogDownloadRequest {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ if self.segment.table_bucket == other.segment.table_bucket {
+ // Same bucket: order by start_offset (ascending - earlier
segments first)
+ self.segment
+ .start_offset
+ .cmp(&other.segment.start_offset)
+ .then_with(||
self.segment.segment_id.cmp(&other.segment.segment_id))
+ } else {
+ // Different buckets: order by max_timestamp (ascending - older
segments first)
+ // Then by table_bucket fields for true total ordering
+ self.segment
+ .max_timestamp
+ .cmp(&other.segment.max_timestamp)
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .table_id()
+ .cmp(&other.segment.table_bucket.table_id())
+ })
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .partition_id()
+ .cmp(&other.segment.table_bucket.partition_id())
+ })
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .bucket_id()
+ .cmp(&other.segment.table_bucket.bucket_id())
+ })
+ .then_with(||
self.segment.segment_id.cmp(&other.segment.segment_id))
+ }
+ }
+}
+
+impl PartialOrd for RemoteLogDownloadRequest {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl PartialEq for RemoteLogDownloadRequest {
+ fn eq(&self, other: &Self) -> bool {
+ self.cmp(other) == std::cmp::Ordering::Equal
+ }
+}
+
+impl Eq for RemoteLogDownloadRequest {}
+
+/// Result of a download task
+enum DownloadResult {
+ /// Successful download - deliver result to future
+ Success {
+ result: RemoteLogFile,
+ result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+ },
+ /// Download failed - re-queue request for retry (Java pattern)
+ FailedRetry { request: RemoteLogDownloadRequest },
+ /// Download failed permanently after max retries - fail the future
+ FailedPermanently {
+ error: Error,
+ result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+ },
+ /// Cancelled - don't deliver, don't re-queue
+ Cancelled,
+}
+
+/// Production implementation of RemoteLogFetcher that downloads from actual
storage
+struct ProductionFetcher {
+ remote_fs_props: Arc<RwLock<HashMap<String, String>>>,
+ local_log_dir: Arc<TempDir>,
+}
+
+impl RemoteLogFetcher for ProductionFetcher {
+ fn fetch(
+ &self,
+ request: &RemoteLogDownloadRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
+ let remote_fs_props = self.remote_fs_props.clone();
+ let local_log_dir = self.local_log_dir.clone();
+
+ // Clone data needed for async operation to avoid lifetime issues
+ let segment = request.segment.clone();
+ let remote_log_tablet_dir = request.remote_log_tablet_dir.to_string();
+
+ Box::pin(async move {
+ let local_file_name = segment.local_file_name();
+ let local_file_path = local_log_dir.path().join(&local_file_name);
+
+ // Build remote path
+ let offset_prefix = format!("{:020}", segment.start_offset);
+ let remote_path = format!(
+ "{}/{}/{}.log",
+ remote_log_tablet_dir, segment.segment_id, offset_prefix
+ );
+
+ let remote_fs_props_map = remote_fs_props.read().clone();
+
+ // Download file to disk (streaming, no memory spike)
+ let file_path = RemoteLogDownloader::download_file(
+ &remote_log_tablet_dir,
+ &remote_path,
+ &local_file_path,
+ &remote_fs_props_map,
+ )
+ .await?;
+
+ // Get file size
+ let metadata = tokio::fs::metadata(&file_path).await?;
+ let file_size = metadata.len() as usize;
+
+ // Return file path - file stays on disk until PrefetchPermit is
dropped
+ Ok(FetchResult {
+ file_path,
+ file_size,
+ })
+ })
+ }
+}
+
+/// Coordinator that owns all download state and orchestrates downloads
+struct DownloadCoordinator {
+ download_queue: BinaryHeap<Reverse<RemoteLogDownloadRequest>>,
+ active_downloads: JoinSet<DownloadResult>,
+ in_flight: usize,
+ prefetch_semaphore: Arc<Semaphore>,
+ max_concurrent_downloads: usize,
+ recycle_notify: Arc<Notify>,
+ fetcher: Arc<dyn RemoteLogFetcher>,
+}
+
+impl DownloadCoordinator {
+ /// Check if we should wait for recycle notification
+ /// Only wait if we're blocked on permits AND have pending work
+ fn should_wait_for_recycle(&self) -> bool {
+ !self.download_queue.is_empty()
+ && self.in_flight < self.max_concurrent_downloads
+ && self.prefetch_semaphore.available_permits() == 0
+ }
+
+ /// Find the earliest retry deadline among pending requests
+ fn next_retry_deadline(&self) -> Option<tokio::time::Instant> {
+ self.download_queue
+ .iter()
+ .filter_map(|Reverse(req)| req.next_retry_at)
+ .min()
+ }
+}
+
+impl DownloadCoordinator {
+ /// Try to start as many downloads as possible (event-driven drain)
+ fn drain(&mut self) {
+ // Collect deferred requests (backoff not ready) to push back later
+ let mut deferred = Vec::new();
+ // Scan entire queue once to find ready requests (prevents
head-of-line blocking)
+ // Bound to reasonable max to avoid excessive work if queue is huge
+ let max_scan = self.download_queue.len().min(100);
+ let mut scanned = 0;
+
+ while !self.download_queue.is_empty()
+ && self.in_flight < self.max_concurrent_downloads
+ && scanned < max_scan
Review Comment:
The max_scan limit of 100 could lead to head-of-line blocking if many
requests in the queue have deferred retry times. If the first 100 items are all
deferred, ready requests beyond position 100 won't be processed until the next
drain cycle. Consider either removing the limit (since the loop has other exit
conditions), or implementing a two-pass scan: first pass to collect deferred
items, second pass to find ready items up to the concurrency limit.
##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -88,17 +146,473 @@ impl RemoteLogFetchInfo {
}
}
+/// RAII guard for prefetch permit that notifies coordinator on drop
+///
+/// NOTE: File deletion is now handled by FileSource::drop(), not here.
+/// This ensures the file is closed before deletion
+#[derive(Debug)]
+pub struct PrefetchPermit {
+ permit: Option<OwnedSemaphorePermit>,
+ recycle_notify: Arc<Notify>,
+}
+
+impl PrefetchPermit {
+ fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc<Notify>) -> Self {
+ Self {
+ permit: Some(permit),
+ recycle_notify,
+ }
+ }
+}
+
+impl Drop for PrefetchPermit {
+ fn drop(&mut self) {
+ // Release capacity (critical: permit must be dropped before notify)
+ let _ = self.permit.take(); // drops permit here
+
+ // Then wake coordinator so it can acquire the now-available permit
+ self.recycle_notify.notify_one();
+ }
+}
+
+/// Downloaded remote log file with prefetch permit
+/// File remains on disk for memory efficiency - permit cleanup deletes it
+#[derive(Debug)]
+pub struct RemoteLogFile {
+ /// Path to the downloaded file on local disk
+ pub file_path: PathBuf,
+ /// Size of the file in bytes
+ /// Currently unused but kept for potential future use (logging, metrics,
etc.)
+ #[allow(dead_code)]
+ pub file_size: usize,
+ /// RAII permit that will delete the file when dropped
+ pub permit: PrefetchPermit,
+}
+
+/// Represents a request to download a remote log segment with priority
ordering
+#[derive(Debug)]
+pub struct RemoteLogDownloadRequest {
+ segment: RemoteLogSegment,
+ remote_log_tablet_dir: String,
+ result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+ retry_count: u32,
+ next_retry_at: Option<tokio::time::Instant>,
+}
+
+impl RemoteLogDownloadRequest {
+ /// Get the segment (used by test fetcher implementations)
+ #[cfg(test)]
+ pub fn segment(&self) -> &RemoteLogSegment {
+ &self.segment
+ }
+}
+
+// Total ordering for priority queue (Rust requirement: cmp==Equal implies Eq)
+// Primary: Java semantics (timestamp cross-bucket, offset within-bucket)
+// Tie-breakers: table_bucket fields (table_id, partition_id, bucket_id), then
segment_id
+impl Ord for RemoteLogDownloadRequest {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ if self.segment.table_bucket == other.segment.table_bucket {
+ // Same bucket: order by start_offset (ascending - earlier
segments first)
+ self.segment
+ .start_offset
+ .cmp(&other.segment.start_offset)
+ .then_with(||
self.segment.segment_id.cmp(&other.segment.segment_id))
+ } else {
+ // Different buckets: order by max_timestamp (ascending - older
segments first)
+ // Then by table_bucket fields for true total ordering
+ self.segment
+ .max_timestamp
+ .cmp(&other.segment.max_timestamp)
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .table_id()
+ .cmp(&other.segment.table_bucket.table_id())
+ })
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .partition_id()
+ .cmp(&other.segment.table_bucket.partition_id())
+ })
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .bucket_id()
+ .cmp(&other.segment.table_bucket.bucket_id())
+ })
+ .then_with(||
self.segment.segment_id.cmp(&other.segment.segment_id))
+ }
+ }
+}
+
+impl PartialOrd for RemoteLogDownloadRequest {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+impl PartialEq for RemoteLogDownloadRequest {
+ fn eq(&self, other: &Self) -> bool {
+ self.cmp(other) == std::cmp::Ordering::Equal
+ }
+}
+
+impl Eq for RemoteLogDownloadRequest {}
+
+/// Result of a download task
+enum DownloadResult {
+ /// Successful download - deliver result to future
+ Success {
+ result: RemoteLogFile,
+ result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+ },
+ /// Download failed - re-queue request for retry (Java pattern)
+ FailedRetry { request: RemoteLogDownloadRequest },
+ /// Download failed permanently after max retries - fail the future
+ FailedPermanently {
+ error: Error,
+ result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+ },
+ /// Cancelled - don't deliver, don't re-queue
+ Cancelled,
+}
+
+/// Production implementation of RemoteLogFetcher that downloads from actual
storage
+struct ProductionFetcher {
+ remote_fs_props: Arc<RwLock<HashMap<String, String>>>,
+ local_log_dir: Arc<TempDir>,
+}
+
+impl RemoteLogFetcher for ProductionFetcher {
+ fn fetch(
+ &self,
+ request: &RemoteLogDownloadRequest,
+ ) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
+ let remote_fs_props = self.remote_fs_props.clone();
+ let local_log_dir = self.local_log_dir.clone();
+
+ // Clone data needed for async operation to avoid lifetime issues
+ let segment = request.segment.clone();
+ let remote_log_tablet_dir = request.remote_log_tablet_dir.to_string();
+
+ Box::pin(async move {
+ let local_file_name = segment.local_file_name();
+ let local_file_path = local_log_dir.path().join(&local_file_name);
+
+ // Build remote path
+ let offset_prefix = format!("{:020}", segment.start_offset);
+ let remote_path = format!(
+ "{}/{}/{}.log",
+ remote_log_tablet_dir, segment.segment_id, offset_prefix
+ );
+
+ let remote_fs_props_map = remote_fs_props.read().clone();
+
+ // Download file to disk (streaming, no memory spike)
+ let file_path = RemoteLogDownloader::download_file(
+ &remote_log_tablet_dir,
+ &remote_path,
+ &local_file_path,
+ &remote_fs_props_map,
+ )
+ .await?;
+
Review Comment:
If download_file() fails after partially writing to local_file_path, the
incomplete file is not cleaned up. This can lead to orphaned temp files
accumulating in the temp directory. Consider wrapping the download in a guard
that deletes the file on failure, or using a unique temporary file name for
each attempt and only moving it to the final location on success.
```suggestion
let download_result = RemoteLogDownloader::download_file(
&remote_log_tablet_dir,
&remote_path,
&local_file_path,
&remote_fs_props_map,
)
.await;
let file_path = match download_result {
Ok(path) => path,
Err(e) => {
// Best-effort cleanup of any partially written local
file on failure
let _ = tokio::fs::remove_file(&local_file_path).await;
return Err(e);
}
};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]