leekeiabstraction commented on code in PR #187:
URL: https://github.com/apache/fluss-rust/pull/187#discussion_r2713405769
##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -88,17 +146,473 @@ impl RemoteLogFetchInfo {
}
}
+/// RAII guard for prefetch permit that notifies coordinator on drop
+///
+/// NOTE: File deletion is now handled by FileSource::drop(), not here.
+/// This ensures the file is closed before deletion
+#[derive(Debug)]
+pub struct PrefetchPermit {
+ permit: Option<OwnedSemaphorePermit>,
+ recycle_notify: Arc<Notify>,
+}
+
+impl PrefetchPermit {
+ fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc<Notify>) -> Self {
+ Self {
+ permit: Some(permit),
+ recycle_notify,
+ }
+ }
+}
+
+impl Drop for PrefetchPermit {
+ fn drop(&mut self) {
+ // Release capacity (critical: permit must be dropped before notify)
+ let _ = self.permit.take(); // drops permit here
+
+ // Then wake coordinator so it can acquire the now-available permit
+ self.recycle_notify.notify_one();
+ }
+}
+
+/// Downloaded remote log file with prefetch permit
+/// File remains on disk for memory efficiency; file deletion is handled by
FileCleanupGuard in FileSource
+#[derive(Debug)]
+pub struct RemoteLogFile {
+ /// Path to the downloaded file on local disk
+ pub file_path: PathBuf,
+ /// Size of the file in bytes
+ /// Currently unused but kept for potential future use (logging, metrics,
etc.)
+ #[allow(dead_code)]
+ pub file_size: usize,
+ /// RAII permit that releases prefetch semaphore slot and notifies
coordinator when dropped
+ pub permit: PrefetchPermit,
+}
+
+/// Represents a request to download a remote log segment with priority
ordering
+#[derive(Debug)]
+pub struct RemoteLogDownloadRequest {
+ segment: RemoteLogSegment,
+ remote_log_tablet_dir: String,
+ result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+ retry_count: u32,
+ next_retry_at: Option<tokio::time::Instant>,
+}
+
+impl RemoteLogDownloadRequest {
+ /// Get the segment (used by test fetcher implementations)
+ #[cfg(test)]
+ pub fn segment(&self) -> &RemoteLogSegment {
+ &self.segment
+ }
+}
+
+// Total ordering for priority queue (Rust requirement: cmp==Equal implies Eq)
+// Primary: Java semantics (timestamp cross-bucket, offset within-bucket)
+// Tie-breakers: table_bucket fields (table_id, partition_id, bucket_id), then
segment_id
+impl Ord for RemoteLogDownloadRequest {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ if self.segment.table_bucket == other.segment.table_bucket {
+ // Same bucket: order by start_offset (ascending - earlier
segments first)
+ self.segment
+ .start_offset
+ .cmp(&other.segment.start_offset)
+ .then_with(||
self.segment.segment_id.cmp(&other.segment.segment_id))
+ } else {
+ // Different buckets: order by max_timestamp (ascending - older
segments first)
+ // Then by table_bucket fields for true total ordering
+ self.segment
+ .max_timestamp
+ .cmp(&other.segment.max_timestamp)
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .table_id()
+ .cmp(&other.segment.table_bucket.table_id())
+ })
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .partition_id()
+ .cmp(&other.segment.table_bucket.partition_id())
+ })
+ .then_with(|| {
+ self.segment
+ .table_bucket
+ .bucket_id()
+ .cmp(&other.segment.table_bucket.bucket_id())
+ })
+ .then_with(||
self.segment.segment_id.cmp(&other.segment.segment_id))
Review Comment:
Agreed. Timestamp matching is rather unlikely in the first place anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]