leekeiabstraction commented on code in PR #187:
URL: https://github.com/apache/fluss-rust/pull/187#discussion_r2712833711


##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -644,6 +647,180 @@ impl CompletedFetch for DefaultCompletedFetch {
     }
 }
 
+/// Completed fetch for remote log segments
+/// Matches Java's RemoteCompletedFetch design - separate class for remote vs 
local
+/// Holds RAII permit until consumed (data is in inner)
+pub struct RemoteCompletedFetch {
+    inner: DefaultCompletedFetch,
+    permit: Option<crate::client::table::remote_log::PrefetchPermit>,

Review Comment:
   nit: import instead of using full namespace



##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -644,6 +647,180 @@ impl CompletedFetch for DefaultCompletedFetch {
     }
 }
 
+/// Completed fetch for remote log segments
+/// Matches Java's RemoteCompletedFetch design - separate class for remote vs 
local
+/// Holds RAII permit until consumed (data is in inner)
+pub struct RemoteCompletedFetch {
+    inner: DefaultCompletedFetch,
+    permit: Option<crate::client::table::remote_log::PrefetchPermit>,
+}
+
+impl RemoteCompletedFetch {
+    pub fn new(
+        inner: DefaultCompletedFetch,
+        permit: crate::client::table::remote_log::PrefetchPermit,

Review Comment:
   nit: import instead of using full namespace



##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -644,6 +647,180 @@ impl CompletedFetch for DefaultCompletedFetch {
     }
 }
 
+/// Completed fetch for remote log segments
+/// Matches Java's RemoteCompletedFetch design - separate class for remote vs 
local
+/// Holds RAII permit until consumed (data is in inner)
+pub struct RemoteCompletedFetch {
+    inner: DefaultCompletedFetch,
+    permit: Option<crate::client::table::remote_log::PrefetchPermit>,
+}
+
+impl RemoteCompletedFetch {
+    pub fn new(
+        inner: DefaultCompletedFetch,
+        permit: crate::client::table::remote_log::PrefetchPermit,
+    ) -> Self {
+        Self {
+            inner,
+            permit: Some(permit),
+        }
+    }
+}
+
+impl CompletedFetch for RemoteCompletedFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        self.inner.table_bucket()
+    }
+
+    fn api_error(&self) -> Option<&ApiError> {
+        self.inner.api_error()
+    }
+
+    fn fetch_error_context(&self) -> Option<&FetchErrorContext> {
+        self.inner.fetch_error_context()
+    }
+
+    fn take_error(&mut self) -> Option<Error> {
+        self.inner.take_error()
+    }
+
+    fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> 
{
+        self.inner.fetch_records(max_records)
+    }
+
+    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>> {
+        self.inner.fetch_batches(max_batches)
+    }
+
+    fn is_consumed(&self) -> bool {
+        self.inner.is_consumed()
+    }
+
+    fn records_read(&self) -> usize {
+        self.inner.records_read()
+    }
+
+    fn drain(&mut self) {
+        self.inner.drain();
+        // Release permit immediately (don't wait for struct drop)
+        // Critical: allows prefetch to continue even if Box<dyn 
CompletedFetch> kept around
+        self.permit.take(); // drops permit here, triggers recycle notification
+    }
+
+    fn size_in_bytes(&self) -> usize {
+        self.inner.size_in_bytes()
+    }
+
+    fn high_watermark(&self) -> i64 {
+        self.inner.high_watermark()
+    }
+
+    fn is_initialized(&self) -> bool {
+        self.inner.is_initialized()
+    }
+
+    fn set_initialized(&mut self) {
+        self.inner.set_initialized()
+    }
+
+    fn next_fetch_offset(&self) -> i64 {
+        self.inner.next_fetch_offset()
+    }
+}
+// Permit released explicitly in drain() or automatically when struct drops
+
+/// Pending fetch that waits for remote log file to be downloaded
+pub struct RemotePendingFetch {
+    segment: crate::client::table::remote_log::RemoteLogSegment,
+    download_future: crate::client::table::remote_log::RemoteLogDownloadFuture,
+    pos_in_log_segment: i32,
+    fetch_offset: i64,
+    high_watermark: i64,
+    read_context: ReadContext,
+}
+
+impl RemotePendingFetch {
+    pub fn new(
+        segment: crate::client::table::remote_log::RemoteLogSegment,
+        download_future: 
crate::client::table::remote_log::RemoteLogDownloadFuture,

Review Comment:
   nit: import instead of using full namespace



##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -644,6 +647,180 @@ impl CompletedFetch for DefaultCompletedFetch {
     }
 }
 
+/// Completed fetch for remote log segments
+/// Matches Java's RemoteCompletedFetch design - separate class for remote vs 
local
+/// Holds RAII permit until consumed (data is in inner)
+pub struct RemoteCompletedFetch {
+    inner: DefaultCompletedFetch,
+    permit: Option<crate::client::table::remote_log::PrefetchPermit>,
+}
+
+impl RemoteCompletedFetch {
+    pub fn new(
+        inner: DefaultCompletedFetch,
+        permit: crate::client::table::remote_log::PrefetchPermit,
+    ) -> Self {
+        Self {
+            inner,
+            permit: Some(permit),
+        }
+    }
+}
+
+impl CompletedFetch for RemoteCompletedFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        self.inner.table_bucket()
+    }
+
+    fn api_error(&self) -> Option<&ApiError> {
+        self.inner.api_error()
+    }
+
+    fn fetch_error_context(&self) -> Option<&FetchErrorContext> {
+        self.inner.fetch_error_context()
+    }
+
+    fn take_error(&mut self) -> Option<Error> {
+        self.inner.take_error()
+    }
+
+    fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> 
{
+        self.inner.fetch_records(max_records)
+    }
+
+    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>> {
+        self.inner.fetch_batches(max_batches)
+    }
+
+    fn is_consumed(&self) -> bool {
+        self.inner.is_consumed()
+    }
+
+    fn records_read(&self) -> usize {
+        self.inner.records_read()
+    }
+
+    fn drain(&mut self) {
+        self.inner.drain();
+        // Release permit immediately (don't wait for struct drop)
+        // Critical: allows prefetch to continue even if Box<dyn 
CompletedFetch> kept around
+        self.permit.take(); // drops permit here, triggers recycle notification
+    }
+
+    fn size_in_bytes(&self) -> usize {
+        self.inner.size_in_bytes()
+    }
+
+    fn high_watermark(&self) -> i64 {
+        self.inner.high_watermark()
+    }
+
+    fn is_initialized(&self) -> bool {
+        self.inner.is_initialized()
+    }
+
+    fn set_initialized(&mut self) {
+        self.inner.set_initialized()
+    }
+
+    fn next_fetch_offset(&self) -> i64 {
+        self.inner.next_fetch_offset()
+    }
+}
+// Permit released explicitly in drain() or automatically when struct drops
+
+/// Pending fetch that waits for remote log file to be downloaded
+pub struct RemotePendingFetch {
+    segment: crate::client::table::remote_log::RemoteLogSegment,
+    download_future: crate::client::table::remote_log::RemoteLogDownloadFuture,
+    pos_in_log_segment: i32,
+    fetch_offset: i64,
+    high_watermark: i64,
+    read_context: ReadContext,
+}
+
+impl RemotePendingFetch {
+    pub fn new(
+        segment: crate::client::table::remote_log::RemoteLogSegment,
+        download_future: 
crate::client::table::remote_log::RemoteLogDownloadFuture,
+        pos_in_log_segment: i32,
+        fetch_offset: i64,
+        high_watermark: i64,
+        read_context: ReadContext,
+    ) -> Self {
+        Self {
+            segment,
+            download_future,
+            pos_in_log_segment,
+            fetch_offset,
+            high_watermark,
+            read_context,
+        }
+    }
+}
+
+impl PendingFetch for RemotePendingFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        &self.segment.table_bucket
+    }
+
+    fn is_completed(&self) -> bool {
+        self.download_future.is_done()
+    }
+
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+        // Take the RemoteLogFile and destructure
+        let remote_log_file = self.download_future.take_remote_log_file()?;
+        let crate::client::table::remote_log::RemoteLogFile {

Review Comment:
   Cool use of destructure! Rust has such amazing features. The same pattern 
could probably be used to improve the current code base in a few places.
   
   Also nit on importing instead of full namespace.



##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -88,17 +146,473 @@ impl RemoteLogFetchInfo {
     }
 }
 
+/// RAII guard for prefetch permit that notifies coordinator on drop
+///
+/// NOTE: File deletion is now handled by FileSource::drop(), not here.
+/// This ensures the file is closed before deletion
+#[derive(Debug)]
+pub struct PrefetchPermit {
+    permit: Option<OwnedSemaphorePermit>,
+    recycle_notify: Arc<Notify>,
+}
+
+impl PrefetchPermit {
+    fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc<Notify>) -> Self {
+        Self {
+            permit: Some(permit),
+            recycle_notify,
+        }
+    }
+}
+
+impl Drop for PrefetchPermit {
+    fn drop(&mut self) {
+        // Release capacity (critical: permit must be dropped before notify)
+        let _ = self.permit.take(); // drops permit here
+
+        // Then wake coordinator so it can acquire the now-available permit
+        self.recycle_notify.notify_one();
+    }
+}
+
+/// Downloaded remote log file with prefetch permit
+/// File remains on disk for memory efficiency; file deletion is handled by 
FileCleanupGuard in FileSource
+#[derive(Debug)]
+pub struct RemoteLogFile {
+    /// Path to the downloaded file on local disk
+    pub file_path: PathBuf,
+    /// Size of the file in bytes
+    /// Currently unused but kept for potential future use (logging, metrics, 
etc.)
+    #[allow(dead_code)]
+    pub file_size: usize,
+    /// RAII permit that releases prefetch semaphore slot and notifies 
coordinator when dropped
+    pub permit: PrefetchPermit,
+}
+
+/// Represents a request to download a remote log segment with priority 
ordering
+#[derive(Debug)]
+pub struct RemoteLogDownloadRequest {
+    segment: RemoteLogSegment,
+    remote_log_tablet_dir: String,
+    result_sender: oneshot::Sender<Result<RemoteLogFile>>,
+    retry_count: u32,
+    next_retry_at: Option<tokio::time::Instant>,
+}
+
+impl RemoteLogDownloadRequest {
+    /// Get the segment (used by test fetcher implementations)
+    #[cfg(test)]
+    pub fn segment(&self) -> &RemoteLogSegment {
+        &self.segment
+    }
+}
+
+// Total ordering for priority queue (Rust requirement: cmp==Equal implies Eq)
+// Primary: Java semantics (timestamp cross-bucket, offset within-bucket)
+// Tie-breakers: table_bucket fields (table_id, partition_id, bucket_id), then 
segment_id
+impl Ord for RemoteLogDownloadRequest {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        if self.segment.table_bucket == other.segment.table_bucket {
+            // Same bucket: order by start_offset (ascending - earlier 
segments first)
+            self.segment
+                .start_offset
+                .cmp(&other.segment.start_offset)
+                .then_with(|| 
self.segment.segment_id.cmp(&other.segment.segment_id))
+        } else {
+            // Different buckets: order by max_timestamp (ascending - older 
segments first)
+            // Then by table_bucket fields for true total ordering
+            self.segment
+                .max_timestamp
+                .cmp(&other.segment.max_timestamp)
+                .then_with(|| {
+                    self.segment
+                        .table_bucket
+                        .table_id()
+                        .cmp(&other.segment.table_bucket.table_id())
+                })
+                .then_with(|| {
+                    self.segment
+                        .table_bucket
+                        .partition_id()
+                        .cmp(&other.segment.table_bucket.partition_id())
+                })
+                .then_with(|| {
+                    self.segment
+                        .table_bucket
+                        .bucket_id()
+                        .cmp(&other.segment.table_bucket.bucket_id())
+                })
+                .then_with(|| 
self.segment.segment_id.cmp(&other.segment.segment_id))

Review Comment:
   I do not see these within Java implementation, is this necessitated by 
Rust's requirement on order comparison?
   
   nit: Comparing with table bucket id means that certain bucket will always 
get priority over others (in the unlikely event of equal timestamp). I wonder 
if we could use something else to prioritise by order/time when request is made?



##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -644,6 +647,180 @@ impl CompletedFetch for DefaultCompletedFetch {
     }
 }
 
+/// Completed fetch for remote log segments
+/// Matches Java's RemoteCompletedFetch design - separate class for remote vs 
local
+/// Holds RAII permit until consumed (data is in inner)
+pub struct RemoteCompletedFetch {
+    inner: DefaultCompletedFetch,
+    permit: Option<crate::client::table::remote_log::PrefetchPermit>,
+}
+
+impl RemoteCompletedFetch {
+    pub fn new(
+        inner: DefaultCompletedFetch,
+        permit: crate::client::table::remote_log::PrefetchPermit,
+    ) -> Self {
+        Self {
+            inner,
+            permit: Some(permit),
+        }
+    }
+}
+
+impl CompletedFetch for RemoteCompletedFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        self.inner.table_bucket()
+    }
+
+    fn api_error(&self) -> Option<&ApiError> {
+        self.inner.api_error()
+    }
+
+    fn fetch_error_context(&self) -> Option<&FetchErrorContext> {
+        self.inner.fetch_error_context()
+    }
+
+    fn take_error(&mut self) -> Option<Error> {
+        self.inner.take_error()
+    }
+
+    fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>> 
{
+        self.inner.fetch_records(max_records)
+    }
+
+    fn fetch_batches(&mut self, max_batches: usize) -> 
Result<Vec<RecordBatch>> {
+        self.inner.fetch_batches(max_batches)
+    }
+
+    fn is_consumed(&self) -> bool {
+        self.inner.is_consumed()
+    }
+
+    fn records_read(&self) -> usize {
+        self.inner.records_read()
+    }
+
+    fn drain(&mut self) {
+        self.inner.drain();
+        // Release permit immediately (don't wait for struct drop)
+        // Critical: allows prefetch to continue even if Box<dyn 
CompletedFetch> kept around
+        self.permit.take(); // drops permit here, triggers recycle notification
+    }
+
+    fn size_in_bytes(&self) -> usize {
+        self.inner.size_in_bytes()
+    }
+
+    fn high_watermark(&self) -> i64 {
+        self.inner.high_watermark()
+    }
+
+    fn is_initialized(&self) -> bool {
+        self.inner.is_initialized()
+    }
+
+    fn set_initialized(&mut self) {
+        self.inner.set_initialized()
+    }
+
+    fn next_fetch_offset(&self) -> i64 {
+        self.inner.next_fetch_offset()
+    }
+}
+// Permit released explicitly in drain() or automatically when struct drops
+
+/// Pending fetch that waits for remote log file to be downloaded
+pub struct RemotePendingFetch {
+    segment: crate::client::table::remote_log::RemoteLogSegment,
+    download_future: crate::client::table::remote_log::RemoteLogDownloadFuture,

Review Comment:
   nit: import instead of using full namespace



##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -88,17 +146,473 @@ impl RemoteLogFetchInfo {
     }
 }
 
+/// RAII guard for prefetch permit that notifies coordinator on drop
+///
+/// NOTE: File deletion is now handled by FileSource::drop(), not here.
+/// This ensures the file is closed before deletion
+#[derive(Debug)]
+pub struct PrefetchPermit {
+    permit: Option<OwnedSemaphorePermit>,
+    recycle_notify: Arc<Notify>,
+}
+
+impl PrefetchPermit {
+    fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc<Notify>) -> Self {
+        Self {
+            permit: Some(permit),
+            recycle_notify,
+        }
+    }
+}
+
+impl Drop for PrefetchPermit {
+    fn drop(&mut self) {

Review Comment:
   Cool use of drop to return permit and notify!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to