zhaohaidao commented on code in PR #103:
URL: https://github.com/apache/fluss-rust/pull/103#discussion_r2637121366


##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -211,125 +260,292 @@ impl LogFetcher {
         projected_fields: Option<Vec<usize>>,
     ) -> Result<Self> {
         let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
-        let read_context = Self::create_read_context(full_arrow_schema, 
projected_fields.clone());
+        let read_context =
+            Self::create_read_context(full_arrow_schema.clone(), 
projected_fields.clone(), false);
+        let remote_read_context =
+            Self::create_read_context(full_arrow_schema, 
projected_fields.clone(), true);
 
         let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
 
         Ok(LogFetcher {
-            table_path: table_info.table_path.clone(),
-            conns,
-            table_info,
-            metadata,
+            conns: conns.clone(),
+            metadata: metadata.clone(),
             log_scanner_status,
             read_context,
-            remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
-            credentials_cache: CredentialsCache::new(),
+            remote_read_context,
+            remote_log_downloader: 
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+            credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), 
metadata.clone())),
+            log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+            nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
         })
     }
 
     fn create_read_context(
         full_arrow_schema: SchemaRef,
         projected_fields: Option<Vec<usize>>,
+        is_from_remote: bool,
     ) -> ReadContext {
         match projected_fields {
-            None => ReadContext::new(full_arrow_schema),
-            Some(fields) => 
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
+            None => ReadContext::new(full_arrow_schema, is_from_remote),
+            Some(fields) => {
+                ReadContext::with_projection_pushdown(full_arrow_schema, 
fields, is_from_remote)
+            }
         }
     }
 
-    async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
+    /// Send fetch requests asynchronously without waiting for responses
+    async fn send_fetches(&self) -> Result<()> {
+        // todo: check update metadata like fluss-java in case leader changes
         let fetch_request = self.prepare_fetch_log_requests().await;
-        let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+
         for (leader, fetch_request) in fetch_request {
-            let cluster = self.metadata.get_cluster();
-            let server_node = cluster
-                .get_tablet_server(leader)
-                .expect("todo: handle leader not exist.");
-            let con = self.conns.get_connection(server_node).await?;
-
-            let fetch_response = con
-                
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
-                .await?;
-
-            for pb_fetch_log_resp in fetch_response.tables_resp {
-                let table_id = pb_fetch_log_resp.table_id;
-                let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-
-                for fetch_log_for_bucket in fetch_log_for_buckets {
-                    let bucket: i32 = fetch_log_for_bucket.bucket_id;
-                    let table_bucket = TableBucket::new(table_id, bucket);
-
-                    // Check if this is a remote log fetch
-                    if let Some(ref remote_log_fetch_info) =
-                        fetch_log_for_bucket.remote_log_fetch_info
-                    {
-                        let remote_fs_props = self
-                            .credentials_cache
-                            .get_or_refresh(&self.conns, &self.metadata)
-                            .await?;
-                        self.remote_log_downloader
-                            .set_remote_fs_props(remote_fs_props);
-                        let remote_fetch_info = RemoteLogFetchInfo::from_proto(
-                            remote_log_fetch_info,
-                            table_bucket.clone(),
-                        )?;
-
-                        if let Some(fetch_offset) =
-                            
self.log_scanner_status.get_bucket_offset(&table_bucket)
-                        {
-                            let high_watermark = 
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
-                            // Download and process remote log segments
-                            let mut pos_in_log_segment = 
remote_fetch_info.first_start_pos;
-                            let mut current_fetch_offset = fetch_offset;
-                            // todo: make segment download in parallel
-                            for (i, segment) in
-                                
remote_fetch_info.remote_log_segments.iter().enumerate()
-                            {
-                                if i > 0 {
-                                    pos_in_log_segment = 0;
-                                    current_fetch_offset = 
segment.start_offset;
-                                }
+            debug!("Adding pending request for node id {leader}");
+            // Check if we already have a pending request for this node
+            {
+                self.nodes_with_pending_fetch_requests.lock().insert(leader);
+            }
+
+            let cluster = self.metadata.get_cluster().clone();
+
+            let conns = Arc::clone(&self.conns);
+            let log_fetch_buffer = self.log_fetch_buffer.clone();
+            let log_scanner_status = self.log_scanner_status.clone();
+            let read_context = self.read_context.clone();
+            let remote_read_context = self.remote_read_context.clone();
+            let remote_log_downloader = 
Arc::clone(&self.remote_log_downloader);
+            let creds_cache = self.credentials_cache.clone();
+            let nodes_with_pending = 
self.nodes_with_pending_fetch_requests.clone();
+
+            // Spawn async task to handle the fetch request
+            tokio::spawn(async move {
+                // make sure it will always remote leader from pending nodes

Review Comment:
   typo? s/remote/remove
   



##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -286,32 +351,54 @@ impl RemotePendingFetch {
             read_context,
         }
     }
+}
+
+impl PendingFetch for RemotePendingFetch {
+    fn table_bucket(&self) -> &TableBucket {
+        &self.segment.table_bucket
+    }
 
-    /// Convert to completed fetch by reading the downloaded file
-    pub async fn convert_to_completed_fetch(
-        mut self,
-    ) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
-        let file_path = self.download_future.get_file_path().await?;
-        let file_data = tokio::fs::read(&file_path).await?;
+    fn is_completed(&self) -> bool {
+        self.download_future.is_done()
+    }
+
+    fn to_completed_fetch(self: Box<Self>) -> Result<Box<dyn CompletedFetch>> {
+        // Get the file path (this should only be called when is_completed() 
returns true)
+        let file_path = self.download_future.get_file_path()?;
+        // Read the file data synchronously (we're in a sync context)
+        // Note: This is a limitation - we need to use blocking I/O here
+        let mut file_data = std::fs::read(&file_path).map_err(|e| 
Error::IoUnexpectedError {
+            message: format!("Failed to read downloaded file: {file_path:?}."),
+            source: e,
+        })?;
 
         // Slice the data if needed
         let data = if self.pos_in_log_segment > 0 {
-            &file_data[self.pos_in_log_segment as usize..]
+            file_data.split_off(self.pos_in_log_segment as usize)
         } else {
-            &file_data
+            file_data
         };
 
-        // delete the downloaded local file to free disk
-        delete_file(file_path).await;
+        let size_in_bytes = data.len();
 
-        // Parse log records (remote log contains full data, need client-side 
projection)
-        let mut fetch_records = vec![];
-        for log_record in &mut LogRecordsBatchs::new(data) {
-            
fetch_records.extend(log_record.records_for_remote_log(&self.read_context)?);
-        }
+        let log_record_batch = LogRecordsBatches::new(data);
+
+        // Create DefaultCompletedFetch from the data
+        let completed_fetch = DefaultCompletedFetch::new(
+            self.segment.table_bucket,
+            log_record_batch,
+            size_in_bytes,
+            self.read_context,
+            self.fetch_offset,
+            self.high_watermark,
+        )?;
+
+        // Delete the downloaded local file to free disk (async, but we'll do 
it in background)
+        let file_path_clone = file_path.clone();
+        tokio::spawn(async move {
+            let _ = delete_file(file_path_clone).await;

Review Comment:
   If deletion fails, will there be any remnants? It's recommended to at least 
log a warning at the log level if deletion fails.



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -211,125 +260,292 @@ impl LogFetcher {
         projected_fields: Option<Vec<usize>>,
     ) -> Result<Self> {
         let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
-        let read_context = Self::create_read_context(full_arrow_schema, 
projected_fields.clone());
+        let read_context =
+            Self::create_read_context(full_arrow_schema.clone(), 
projected_fields.clone(), false);
+        let remote_read_context =
+            Self::create_read_context(full_arrow_schema, 
projected_fields.clone(), true);
 
         let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
 
         Ok(LogFetcher {
-            table_path: table_info.table_path.clone(),
-            conns,
-            table_info,
-            metadata,
+            conns: conns.clone(),
+            metadata: metadata.clone(),
             log_scanner_status,
             read_context,
-            remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
-            credentials_cache: CredentialsCache::new(),
+            remote_read_context,
+            remote_log_downloader: 
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+            credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), 
metadata.clone())),
+            log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+            nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
         })
     }
 
     fn create_read_context(
         full_arrow_schema: SchemaRef,
         projected_fields: Option<Vec<usize>>,
+        is_from_remote: bool,
     ) -> ReadContext {
         match projected_fields {
-            None => ReadContext::new(full_arrow_schema),
-            Some(fields) => 
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
+            None => ReadContext::new(full_arrow_schema, is_from_remote),
+            Some(fields) => {
+                ReadContext::with_projection_pushdown(full_arrow_schema, 
fields, is_from_remote)
+            }
         }
     }
 
-    async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
+    /// Send fetch requests asynchronously without waiting for responses
+    async fn send_fetches(&self) -> Result<()> {
+        // todo: check update metadata like fluss-java in case leader changes
         let fetch_request = self.prepare_fetch_log_requests().await;
-        let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+
         for (leader, fetch_request) in fetch_request {
-            let cluster = self.metadata.get_cluster();
-            let server_node = cluster
-                .get_tablet_server(leader)
-                .expect("todo: handle leader not exist.");
-            let con = self.conns.get_connection(server_node).await?;
-
-            let fetch_response = con
-                
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
-                .await?;
-
-            for pb_fetch_log_resp in fetch_response.tables_resp {
-                let table_id = pb_fetch_log_resp.table_id;
-                let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-
-                for fetch_log_for_bucket in fetch_log_for_buckets {
-                    let bucket: i32 = fetch_log_for_bucket.bucket_id;
-                    let table_bucket = TableBucket::new(table_id, bucket);
-
-                    // Check if this is a remote log fetch
-                    if let Some(ref remote_log_fetch_info) =
-                        fetch_log_for_bucket.remote_log_fetch_info
-                    {
-                        let remote_fs_props = self
-                            .credentials_cache
-                            .get_or_refresh(&self.conns, &self.metadata)
-                            .await?;
-                        self.remote_log_downloader
-                            .set_remote_fs_props(remote_fs_props);
-                        let remote_fetch_info = RemoteLogFetchInfo::from_proto(
-                            remote_log_fetch_info,
-                            table_bucket.clone(),
-                        )?;
-
-                        if let Some(fetch_offset) =
-                            
self.log_scanner_status.get_bucket_offset(&table_bucket)
-                        {
-                            let high_watermark = 
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
-                            // Download and process remote log segments
-                            let mut pos_in_log_segment = 
remote_fetch_info.first_start_pos;
-                            let mut current_fetch_offset = fetch_offset;
-                            // todo: make segment download in parallel
-                            for (i, segment) in
-                                
remote_fetch_info.remote_log_segments.iter().enumerate()
-                            {
-                                if i > 0 {
-                                    pos_in_log_segment = 0;
-                                    current_fetch_offset = 
segment.start_offset;
-                                }
+            debug!("Adding pending request for node id {leader}");
+            // Check if we already have a pending request for this node
+            {
+                self.nodes_with_pending_fetch_requests.lock().insert(leader);
+            }
+
+            let cluster = self.metadata.get_cluster().clone();
+
+            let conns = Arc::clone(&self.conns);
+            let log_fetch_buffer = self.log_fetch_buffer.clone();
+            let log_scanner_status = self.log_scanner_status.clone();
+            let read_context = self.read_context.clone();
+            let remote_read_context = self.remote_read_context.clone();
+            let remote_log_downloader = 
Arc::clone(&self.remote_log_downloader);
+            let creds_cache = self.credentials_cache.clone();
+            let nodes_with_pending = 
self.nodes_with_pending_fetch_requests.clone();
+
+            // Spawn async task to handle the fetch request
+            tokio::spawn(async move {

Review Comment:
   If LogFetcher is stopped, is there a task leak issue? I don't see where 
these asynchronous tasks are maintained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to