luoyuxia commented on code in PR #103:
URL: https://github.com/apache/fluss-rust/pull/103#discussion_r2637515748


##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -286,32 +351,54 @@ impl RemotePendingFetch {
             read_context,
         }
     }
+}
+
+impl PendingFetch for RemotePendingFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        &self.segment.table_bucket
+    }
 
-    /// Convert to completed fetch by reading the downloaded file
-    pub async fn convert_to_completed_fetch(
-        mut self,
-    ) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
-        let file_path = self.download_future.get_file_path().await?;
-        let file_data = tokio::fs::read(&file_path).await?;
+    fn is_completed(&self) -> bool {
+        self.download_future.is_done()
+    }
+
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+        // Get the file path (this should only be called when is_completed() 
returns true)
+        let file_path = self.download_future.get_file_path()?;
+        // Read the file data synchronously (we're in a sync context)
+        // Note: This is a limitation - we need to use blocking I/O here
+        let mut file_data = std::fs::read(&file_path).map_err(|e| 
Error::IoUnexpectedError {
+            message: format!("Failed to read downloaded file: {file_path:?}."),
+            source: e,
+        })?;
 
         // Slice the data if needed
         let data = if self.pos_in_log_segment > 0 {
-            &file_data[self.pos_in_log_segment as usize..]
+            file_data.split_off(self.pos_in_log_segment as usize)
         } else {
-            &file_data
+            file_data
         };
 
-        // delete the downloaded local file to free disk
-        delete_file(file_path).await;
+        let size_in_bytes = data.len();
 
-        // Parse log records (remote log contains full data, need client-side 
projection)
-        let mut fetch_records = vec![];
-        for log_record in &mut LogRecordsBatchs::new(data) {
-            
fetch_records.extend(log_record.records_for_remote_log(&self.read_context)?);
-        }
+        let log_record_batch = LogRecordsBatches::new(data);
+
+        // Create DefaultCompletedFetch from the data
+        let completed_fetch = DefaultCompletedFetch::new(
+            self.segment.table_bucket,
+            log_record_batch,
+            size_in_bytes,
+            self.read_context,
+            self.fetch_offset,
+            self.high_watermark,
+        )?;
+
+        // Delete the downloaded local file to free disk (async, but we'll do 
it in background)
+        let file_path_clone = file_path.clone();
+        tokio::spawn(async move {
+            let _ = delete_file(file_path_clone).await;

Review Comment:
   `delete_file` will warn it.
   After `RemoteLogDownloader` is moved of out scope, the `TempDir` will be 
drop, causing the all downloaded files deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to