luoyuxia commented on code in PR #143:
URL: https://github.com/apache/fluss-rust/pull/143#discussion_r2679536693


##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -803,7 +1013,19 @@ impl LogFetcher {
                 // Fetch records from next_in_line
                 if let Some(mut next_fetch) = next_in_line {
                     let records =
-                        self.fetch_records_from_fetch(&mut next_fetch, 
records_remaining)?;
+                        match self.fetch_records_from_fetch(&mut next_fetch, 
records_remaining) {
+                            Ok(records) => records,
+                            Err(e) => {
+                                if result.is_empty() {
+                                    return Err(e);
+                                }
+                                if !next_fetch.is_consumed() {

Review Comment:
   I found it duplicated with  
https://github.com/apache/fluss-rust/pull/143/files#diff-540a310c6f8139ea4592281917afecdd364792ffc3655a65221307465cabd09eR1041
   and i think this peice of code is error-prone. I'm wondering whether the 
following code will be more straight forward and more coinsistent with java 
side to avoid potential bug
   
   ```
   let collect_result: Result<()> = {
               while records_remaining > 0 {
                   // Get the next in line fetch, or get a new one from buffer
                   let next_in_line = 
self.log_fetch_buffer.next_in_line_fetch();
                   if next_in_line.is_none() || 
next_in_line.as_ref().unwrap().is_consumed() {
                       // Get a new fetch from buffer
                       if let Some(completed_fetch) = 
self.log_fetch_buffer.poll() {
                           // Initialize the fetch if not already initialized
                           if !completed_fetch.is_initialized() {
                               let size_in_bytes = 
completed_fetch.size_in_bytes();
                               match self.initialize_fetch(completed_fetch) {
                                   Ok(initialized) => {
                                       
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
                                       continue;
                                   }
                                   Err(e) => {
                                       if result.is_empty() && size_in_bytes == 
0 {
                                           // todo: add back completed_fetch to 
log_fetch_buffer, 
                                           // otherwise, we will skip this 
fetch error by mistake, which cause
                                           // data loss but user never have 
chance to know it. 
                                           
                                       }
                                       return Err(e);
                                   }
                               }
                           } else {
                               self.log_fetch_buffer
                                   
.set_next_in_line_fetch(Some(completed_fetch));
                           }
                       } else {
                           // No more fetches available
                           break;
                       }
                   } else {
                       // Fetch records from next_in_line
                       if let Some(mut next_fetch) = next_in_line {
                           let records =
                               self.fetch_records_from_fetch(&mut next_fetch, 
records_remaining)?;
   
                           if !records.is_empty() {
                               let table_bucket = 
next_fetch.table_bucket().clone();
                               // Merge with existing records for this bucket
                               let existing = 
result.entry(table_bucket).or_default();
                               let records_count = records.len();
                               existing.extend(records);
   
                               records_remaining = 
records_remaining.saturating_sub(records_count);
                           }
   
                           // If the fetch is not fully consumed, put it back 
for the next round
                           if !next_fetch.is_consumed() {
                               self.log_fetch_buffer
                                   .set_next_in_line_fetch(Some(next_fetch));
                           }
                           // If consumed, next_fetch will be dropped here 
(which is correct)
                       }
                   }
               }
               Ok(())
           };
   
           match collect_result {
               Ok(_) => Ok(result),
               Err(e) => {
                   // If we have already collected records, suppress the 
exception 
                   // and return the partial results to the user first.
                   if result.is_empty() {
                       Err(e)
                   } else {
                       // Swallow the exception and return the currently 
accumulated result.
                       // The error will likely be re-triggered during the next 
poll attempt.
                       Ok(result)
                   }
               }
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to