luoyuxia commented on code in PR #154:
URL: https://github.com/apache/fluss-rust/pull/154#discussion_r2727492068


##########
bindings/python/src/table.rs:
##########
@@ -960,31 +960,56 @@ impl LogScanner {
                     .block_on(async { 
self.inner.poll(Duration::from_millis(500)).await });
 
                 match batch_result {
-                    Ok(scan_records) => {
-                        let mut result_records: Vec<fcore::record::ScanRecord> 
= vec![];
-                        for (bucket, records) in 
scan_records.into_records_by_buckets() {
-                            let stopping_offset = 
stopping_offsets.get(&bucket.bucket_id());
-
-                            if stopping_offset.is_none() {
-                                // not to include this bucket, skip records 
for this bucket
-                                // since we already reach end offset for this 
bucket
-                                continue;
-                            }
-                            if let Some(last_record) = records.last() {
-                                let offset = last_record.offset();
-                                result_records.extend(records);
-                                if offset >= stopping_offset.unwrap() - 1 {
-                                    
stopping_offsets.remove(&bucket.bucket_id());
+                    Ok(scan_batches) => {
+                        for scan_batch in scan_batches {
+                            let bucket_id = scan_batch.bucket().bucket_id();
+
+                            // Extract stopping_offset once to avoid double 
unwrap
+                            let stop_exclusive = match 
stopping_offsets.get(&bucket_id) {
+                                Some(&offset) => offset,
+                                None => {
+                                    // we already reached end offset for this 
bucket
+                                    continue;
                                 }
-                            }
-                        }
+                            };
+
+                            // Compute the inclusive last offset we want 
(stop_exclusive - 1)
+                            let stop_inclusive = match 
stop_exclusive.checked_sub(1) {
+                                Some(v) => v,
+                                None => {
+                                    // stop_exclusive was 0 or negative - 
nothing to read

Review Comment:
   if `stop_exclusive` is 1, seem will come into here to skip to read anything. 
But actually, we need to read one record.
   
   Just personal taste, Will it be simipler using the following code:
   ```
   stopping_offsets.retain(|_, &mut v| v > 0);
   
           while !stopping_offsets.is_empty() {
               let scan_batches = TOKIO_RUNTIME
                   .block_on(async { 
self.inner.poll(Duration::from_millis(500)).await })
                   .map_err(|e| FlussError::new_err(e.to_string()))?;
   
               if scan_batches.is_empty() {
                   continue;
               }
   
               for scan_batch in scan_batches {
                   let bucket_id = scan_batch.bucket().bucket_id();
   
                   // Check if this bucket is still being tracked; if not, 
ignore the batch
                   let Some(&stop_at) = stopping_offsets.get(&bucket_id) else {
                       continue;
                   };
   
                   let base_offset = scan_batch.base_offset();
                   let last_offset = scan_batch.last_offset();
   
                   // If the batch starts at or after the stop_at offset, the 
bucket is exhausted
                   if base_offset >= stop_at {
                       stopping_offsets.remove(&bucket_id);
                       continue;
                   }
   
                   let batch = if last_offset >= stop_at {
                       // This batch contains the target offset; slice it to 
keep only records 
                       // where offset < stop_at.
                       let num_to_keep = (stop_at - base_offset) as usize;
                       let b = scan_batch.into_batch();
   
                       // Safety check: ensure we don't attempt to slice more 
rows than the batch contains
                       let limit = num_to_keep.min(b.num_rows());
                       b.slice(0, limit)
                   } else {
                       // The entire batch is within the desired range (all 
offsets < stop_at)
                       scan_batch.into_batch()
                   };
   
                   all_batches.push(Arc::new(batch));
   
                   // If the batch's last offset reached or passed the 
inclusive limit (stop_at - 1),
                   // we are done with this bucket.
                   if last_offset >= stop_at - 1 {
                       stopping_offsets.remove(&bucket_id);
                   }
               }
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to