fresh-borzoni commented on code in PR #187:
URL: https://github.com/apache/fluss-rust/pull/187#discussion_r2713277799
##########
crates/fluss/src/client/table/log_fetch_buffer.rs:
##########
@@ -644,6 +647,180 @@ impl CompletedFetch for DefaultCompletedFetch {
}
}
+/// Completed fetch for remote log segments
+/// Matches Java's RemoteCompletedFetch design - separate class for remote vs
local
+/// Holds RAII permit until consumed (data is in inner)
+pub struct RemoteCompletedFetch {
+ inner: DefaultCompletedFetch,
+ permit: Option<crate::client::table::remote_log::PrefetchPermit>,
+}
+
+impl RemoteCompletedFetch {
+ pub fn new(
+ inner: DefaultCompletedFetch,
+ permit: crate::client::table::remote_log::PrefetchPermit,
+ ) -> Self {
+ Self {
+ inner,
+ permit: Some(permit),
+ }
+ }
+}
+
+impl CompletedFetch for RemoteCompletedFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ self.inner.table_bucket()
+ }
+
+ fn api_error(&self) -> Option<&ApiError> {
+ self.inner.api_error()
+ }
+
+ fn fetch_error_context(&self) -> Option<&FetchErrorContext> {
+ self.inner.fetch_error_context()
+ }
+
+ fn take_error(&mut self) -> Option<Error> {
+ self.inner.take_error()
+ }
+
+ fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>
{
+ self.inner.fetch_records(max_records)
+ }
+
+ fn fetch_batches(&mut self, max_batches: usize) ->
Result<Vec<RecordBatch>> {
+ self.inner.fetch_batches(max_batches)
+ }
+
+ fn is_consumed(&self) -> bool {
+ self.inner.is_consumed()
+ }
+
+ fn records_read(&self) -> usize {
+ self.inner.records_read()
+ }
+
+ fn drain(&mut self) {
+ self.inner.drain();
+ // Release permit immediately (don't wait for struct drop)
+ // Critical: allows prefetch to continue even if Box<dyn
CompletedFetch> kept around
+ self.permit.take(); // drops permit here, triggers recycle notification
+ }
+
+ fn size_in_bytes(&self) -> usize {
+ self.inner.size_in_bytes()
+ }
+
+ fn high_watermark(&self) -> i64 {
+ self.inner.high_watermark()
+ }
+
+ fn is_initialized(&self) -> bool {
+ self.inner.is_initialized()
+ }
+
+ fn set_initialized(&mut self) {
+ self.inner.set_initialized()
+ }
+
+ fn next_fetch_offset(&self) -> i64 {
+ self.inner.next_fetch_offset()
+ }
+}
+// Permit released explicitly in drain() or automatically when struct drops
+
+/// Pending fetch that waits for remote log file to be downloaded
+pub struct RemotePendingFetch {
+ segment: crate::client::table::remote_log::RemoteLogSegment,
+ download_future: crate::client::table::remote_log::RemoteLogDownloadFuture,
+ pos_in_log_segment: i32,
+ fetch_offset: i64,
+ high_watermark: i64,
+ read_context: ReadContext,
+}
+
+impl RemotePendingFetch {
+ pub fn new(
+ segment: crate::client::table::remote_log::RemoteLogSegment,
+ download_future:
crate::client::table::remote_log::RemoteLogDownloadFuture,
+ pos_in_log_segment: i32,
+ fetch_offset: i64,
+ high_watermark: i64,
+ read_context: ReadContext,
+ ) -> Self {
+ Self {
+ segment,
+ download_future,
+ pos_in_log_segment,
+ fetch_offset,
+ high_watermark,
+ read_context,
+ }
+ }
+}
+
+impl PendingFetch for RemotePendingFetch {
+ fn table_bucket(&self) -> &TableBucket {
+ &self.segment.table_bucket
+ }
+
+ fn is_completed(&self) -> bool {
+ self.download_future.is_done()
+ }
+
+ fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+ // Take the RemoteLogFile and destructure
+ let remote_log_file = self.download_future.take_remote_log_file()?;
+ let crate::client::table::remote_log::RemoteLogFile {
Review Comment:
double hit :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]