luoyuxia commented on code in PR #154:
URL: https://github.com/apache/fluss-rust/pull/154#discussion_r2727492068
##########
bindings/python/src/table.rs:
##########
@@ -960,31 +960,56 @@ impl LogScanner {
.block_on(async {
self.inner.poll(Duration::from_millis(500)).await });
match batch_result {
- Ok(scan_records) => {
- let mut result_records: Vec<fcore::record::ScanRecord>
= vec![];
- for (bucket, records) in
scan_records.into_records_by_buckets() {
- let stopping_offset =
stopping_offsets.get(&bucket.bucket_id());
-
- if stopping_offset.is_none() {
- // not to include this bucket, skip records
for this bucket
- // since we already reach end offset for this
bucket
- continue;
- }
- if let Some(last_record) = records.last() {
- let offset = last_record.offset();
- result_records.extend(records);
- if offset >= stopping_offset.unwrap() - 1 {
-
stopping_offsets.remove(&bucket.bucket_id());
+ Ok(scan_batches) => {
+ for scan_batch in scan_batches {
+ let bucket_id = scan_batch.bucket().bucket_id();
+
+ // Extract stopping_offset once to avoid double
unwrap
+ let stop_exclusive = match
stopping_offsets.get(&bucket_id) {
+ Some(&offset) => offset,
+ None => {
+ // we already reached end offset for this
bucket
+ continue;
}
- }
- }
+ };
+
+ // Compute the inclusive last offset we want
(stop_exclusive - 1)
+ let stop_inclusive = match
stop_exclusive.checked_sub(1) {
+ Some(v) => v,
+ None => {
+ // stop_exclusive was 0 or negative -
nothing to read
Review Comment:
if `stop_exclusive` is 1, seem will come into here to skip to read anything.
But actually, we need to read one record.
Just personal taste, Will it be simipler using the following code:
```
stopping_offsets.retain(|_, &mut v| v > 0);
while !stopping_offsets.is_empty() {
let scan_batches = TOKIO_RUNTIME
.block_on(async {
self.inner.poll(Duration::from_millis(500)).await })
.map_err(|e| FlussError::new_err(e.to_string()))?;
if scan_batches.is_empty() {
continue;
}
for scan_batch in scan_batches {
let bucket_id = scan_batch.bucket().bucket_id();
// Check if this bucket is still being tracked; if not,
ignore the batch
let Some(&stop_at) = stopping_offsets.get(&bucket_id) else {
continue;
};
let base_offset = scan_batch.base_offset();
let last_offset = scan_batch.last_offset();
// If the batch starts at or after the stop_at offset, the
bucket is exhausted
if base_offset >= stop_at {
stopping_offsets.remove(&bucket_id);
continue;
}
let batch = if last_offset >= stop_at {
// This batch contains the target offset; slice it to
keep only records
// where offset < stop_at.
let num_to_keep = (stop_at - base_offset) as usize;
let b = scan_batch.into_batch();
// Safety check: ensure we don't attempt to slice more
rows than the batch contains
let limit = num_to_keep.min(b.num_rows());
b.slice(0, limit)
} else {
// The entire batch is within the desired range (all
offsets < stop_at)
scan_batch.into_batch()
};
all_batches.push(Arc::new(batch));
// If the batch's last offset reached or passed the
inclusive limit (stop_at - 1),
// we are done with this bucket.
if last_offset >= stop_at - 1 {
stopping_offsets.remove(&bucket_id);
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]