zhaohaidao commented on code in PR #187:
URL: https://github.com/apache/fluss-rust/pull/187#discussion_r2710833833


##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -88,17 +146,473 @@ impl RemoteLogFetchInfo {
     }
 }
 
+/// RAII guard for prefetch permit that notifies coordinator on drop
+///
+/// NOTE: File deletion is now handled by FileSource::drop(), not here.
+/// This ensures the file is closed before deletion
+#[derive(Debug)]
+pub struct PrefetchPermit {
+    permit: Option<OwnedSemaphorePermit>,
+    recycle_notify: Arc<Notify>,
+}
+
+impl PrefetchPermit {
+    fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc<Notify>) -> Self {
+        Self {
+            permit: Some(permit),
+            recycle_notify,
+        }
+    }
+}
+
+impl Drop for PrefetchPermit {
+    fn drop(&mut self) {
+        // Release capacity (critical: permit must be dropped before notify)
+        let _ = self.permit.take(); // drops permit here
+
+        // Then wake coordinator so it can acquire the now-available permit
+        self.recycle_notify.notify_one();
+    }
+}
+
+/// Downloaded remote log file with prefetch permit
+/// File remains on disk for memory efficiency - permit cleanup deletes it
+#[derive(Debug)]
+pub struct RemoteLogFile {
+    /// Path to the downloaded file on local disk
+    pub file_path: PathBuf,
+    /// Size of the file in bytes
+    /// Currently unused but kept for potential future use (logging, metrics, 
etc.)
+    #[allow(dead_code)]
+    pub file_size: usize,
+    /// RAII permit that will delete the file when dropped
+    pub permit: PrefetchPermit,
+}
+
+/// Represents a request to download a remote log segment with priority 
ordering
+#[derive(Debug)]
+pub struct RemoteLogDownloadRequest {
+    segment: RemoteLogSegment,
+    remote_log_tablet_dir: String,
+    result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+    retry_count: u32,
+    next_retry_at: Option<tokio::time::Instant>,
+}
+
+impl RemoteLogDownloadRequest {
+    /// Get the segment (used by test fetcher implementations)
+    #[cfg(test)]
+    pub fn segment(&self) -> &RemoteLogSegment {
+        &self.segment
+    }
+}
+
+// Total ordering for priority queue (Rust requirement: cmp==Equal implies Eq)
+// Primary: Java semantics (timestamp cross-bucket, offset within-bucket)
+// Tie-breakers: table_bucket fields (table_id, partition_id, bucket_id), then 
segment_id
+impl Ord for RemoteLogDownloadRequest {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        if self.segment.table_bucket == other.segment.table_bucket {
+            // Same bucket: order by start_offset (ascending - earlier 
segments first)
+            self.segment
+                .start_offset
+                .cmp(&other.segment.start_offset)
+                .then_with(|| 
self.segment.segment_id.cmp(&other.segment.segment_id))
+        } else {
+            // Different buckets: order by max_timestamp (ascending - older 
segments first)
+            // Then by table_bucket fields for true total ordering
+            self.segment
+                .max_timestamp
+                .cmp(&other.segment.max_timestamp)
+                .then_with(|| {
+                    self.segment
+                        .table_bucket
+                        .table_id()
+                        .cmp(&other.segment.table_bucket.table_id())
+                })
+                .then_with(|| {
+                    self.segment
+                        .table_bucket
+                        .partition_id()
+                        .cmp(&other.segment.table_bucket.partition_id())
+                })
+                .then_with(|| {
+                    self.segment
+                        .table_bucket
+                        .bucket_id()
+                        .cmp(&other.segment.table_bucket.bucket_id())
+                })
+                .then_with(|| 
self.segment.segment_id.cmp(&other.segment.segment_id))
+        }
+    }
+}
+
+impl PartialOrd for RemoteLogDownloadRequest {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl PartialEq for RemoteLogDownloadRequest {
+    fn eq(&self, other: &Self) -> bool {
+        self.cmp(other) == std::cmp::Ordering::Equal
+    }
+}
+
+impl Eq for RemoteLogDownloadRequest {}
+
+/// Result of a download task
+enum DownloadResult {
+    /// Successful download - deliver result to future
+    Success {
+        result: RemoteLogFile,
+        result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+    },
+    /// Download failed - re-queue request for retry (Java pattern)
+    FailedRetry { request: RemoteLogDownloadRequest },
+    /// Download failed permanently after max retries - fail the future
+    FailedPermanently {
+        error: Error,
+        result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+    },
+    /// Cancelled - don't deliver, don't re-queue
+    Cancelled,
+}
+
+/// Production implementation of RemoteLogFetcher that downloads from actual 
storage

Review Comment:
   What exactly does "Production" mean here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to