luoyuxia commented on code in PR #103:
URL: https://github.com/apache/fluss-rust/pull/103#discussion_r2637521822


##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -211,125 +260,292 @@ impl LogFetcher {
         projected_fields: Option<Vec<usize>>,
     ) -> Result<Self> {
         let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
-        let read_context = Self::create_read_context(full_arrow_schema, 
projected_fields.clone());
+        let read_context =
+            Self::create_read_context(full_arrow_schema.clone(), 
projected_fields.clone(), false);
+        let remote_read_context =
+            Self::create_read_context(full_arrow_schema, 
projected_fields.clone(), true);
 
         let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
 
         Ok(LogFetcher {
-            table_path: table_info.table_path.clone(),
-            conns,
-            table_info,
-            metadata,
+            conns: conns.clone(),
+            metadata: metadata.clone(),
             log_scanner_status,
             read_context,
-            remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
-            credentials_cache: CredentialsCache::new(),
+            remote_read_context,
+            remote_log_downloader: 
Arc::new(RemoteLogDownloader::new(tmp_dir)?),
+            credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), 
metadata.clone())),
+            log_fetch_buffer: Arc::new(LogFetchBuffer::new()),
+            nodes_with_pending_fetch_requests: 
Arc::new(Mutex::new(HashSet::new())),
         })
     }
 
     fn create_read_context(
         full_arrow_schema: SchemaRef,
         projected_fields: Option<Vec<usize>>,
+        is_from_remote: bool,
     ) -> ReadContext {
         match projected_fields {
-            None => ReadContext::new(full_arrow_schema),
-            Some(fields) => 
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
+            None => ReadContext::new(full_arrow_schema, is_from_remote),
+            Some(fields) => {
+                ReadContext::with_projection_pushdown(full_arrow_schema, 
fields, is_from_remote)
+            }
         }
     }
 
-    async fn send_fetches_and_collect(&self) -> Result<HashMap<TableBucket, 
Vec<ScanRecord>>> {
+    /// Send fetch requests asynchronously without waiting for responses
+    async fn send_fetches(&self) -> Result<()> {
+        // todo: check update metadata like fluss-java in case leader changes
         let fetch_request = self.prepare_fetch_log_requests().await;
-        let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
+
         for (leader, fetch_request) in fetch_request {
-            let cluster = self.metadata.get_cluster();
-            let server_node = cluster
-                .get_tablet_server(leader)
-                .expect("todo: handle leader not exist.");
-            let con = self.conns.get_connection(server_node).await?;
-
-            let fetch_response = con
-                
.request(crate::rpc::message::FetchLogRequest::new(fetch_request))
-                .await?;
-
-            for pb_fetch_log_resp in fetch_response.tables_resp {
-                let table_id = pb_fetch_log_resp.table_id;
-                let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-
-                for fetch_log_for_bucket in fetch_log_for_buckets {
-                    let bucket: i32 = fetch_log_for_bucket.bucket_id;
-                    let table_bucket = TableBucket::new(table_id, bucket);
-
-                    // Check if this is a remote log fetch
-                    if let Some(ref remote_log_fetch_info) =
-                        fetch_log_for_bucket.remote_log_fetch_info
-                    {
-                        let remote_fs_props = self
-                            .credentials_cache
-                            .get_or_refresh(&self.conns, &self.metadata)
-                            .await?;
-                        self.remote_log_downloader
-                            .set_remote_fs_props(remote_fs_props);
-                        let remote_fetch_info = RemoteLogFetchInfo::from_proto(
-                            remote_log_fetch_info,
-                            table_bucket.clone(),
-                        )?;
-
-                        if let Some(fetch_offset) =
-                            
self.log_scanner_status.get_bucket_offset(&table_bucket)
-                        {
-                            let high_watermark = 
fetch_log_for_bucket.high_watermark.unwrap_or(-1);
-                            // Download and process remote log segments
-                            let mut pos_in_log_segment = 
remote_fetch_info.first_start_pos;
-                            let mut current_fetch_offset = fetch_offset;
-                            // todo: make segment download in parallel
-                            for (i, segment) in
-                                
remote_fetch_info.remote_log_segments.iter().enumerate()
-                            {
-                                if i > 0 {
-                                    pos_in_log_segment = 0;
-                                    current_fetch_offset = 
segment.start_offset;
-                                }
+            debug!("Adding pending request for node id {leader}");
+            // Check if we already have a pending request for this node
+            {
+                self.nodes_with_pending_fetch_requests.lock().insert(leader);
+            }
+
+            let cluster = self.metadata.get_cluster().clone();
+
+            let conns = Arc::clone(&self.conns);
+            let log_fetch_buffer = self.log_fetch_buffer.clone();
+            let log_scanner_status = self.log_scanner_status.clone();
+            let read_context = self.read_context.clone();
+            let remote_read_context = self.remote_read_context.clone();
+            let remote_log_downloader = 
Arc::clone(&self.remote_log_downloader);
+            let creds_cache = self.credentials_cache.clone();
+            let nodes_with_pending = 
self.nodes_with_pending_fetch_requests.clone();
+
+            // Spawn async task to handle the fetch request
+            tokio::spawn(async move {

Review Comment:
   If LogFetcher, the task will still run, but after task finishes, it wil be 
removed. I think it's fine now since java side also have the same behavior. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to