luoyuxia commented on code in PR #143:
URL: https://github.com/apache/fluss-rust/pull/143#discussion_r2679536693
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -803,7 +1013,19 @@ impl LogFetcher {
// Fetch records from next_in_line
if let Some(mut next_fetch) = next_in_line {
let records =
- self.fetch_records_from_fetch(&mut next_fetch,
records_remaining)?;
+ match self.fetch_records_from_fetch(&mut next_fetch,
records_remaining) {
+ Ok(records) => records,
+ Err(e) => {
+ if result.is_empty() {
+ return Err(e);
+ }
+ if !next_fetch.is_consumed() {
Review Comment:
I found it duplicated with
https://github.com/apache/fluss-rust/pull/143/files#diff-540a310c6f8139ea4592281917afecdd364792ffc3655a65221307465cabd09eR1041
and i think this peice of code is error-prone. I'm wondering whether the
following code will be more straight forward and more coinsistent with java
side to avoid potential bug
```
let collect_result: Result<()> = {
while records_remaining > 0 {
// Get the next in line fetch, or get a new one from buffer
let next_in_line =
self.log_fetch_buffer.next_in_line_fetch();
if next_in_line.is_none() ||
next_in_line.as_ref().unwrap().is_consumed() {
// Get a new fetch from buffer
if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
// Initialize the fetch if not already initialized
if !completed_fetch.is_initialized() {
let size_in_bytes =
completed_fetch.size_in_bytes();
match self.initialize_fetch(completed_fetch) {
Ok(initialized) => {
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
continue;
}
Err(e) => {
if result.is_empty() && size_in_bytes ==
0 {
// todo: add back completed_fetch to
log_fetch_buffer,
// otherwise, we will skip this
fetch error by mistake, which cause
// data loss but user never have
chance to know it.
}
return Err(e);
}
}
} else {
self.log_fetch_buffer
.set_next_in_line_fetch(Some(completed_fetch));
}
} else {
// No more fetches available
break;
}
} else {
// Fetch records from next_in_line
if let Some(mut next_fetch) = next_in_line {
let records =
self.fetch_records_from_fetch(&mut next_fetch,
records_remaining)?;
if !records.is_empty() {
let table_bucket =
next_fetch.table_bucket().clone();
// Merge with existing records for this bucket
let existing =
result.entry(table_bucket).or_default();
let records_count = records.len();
existing.extend(records);
records_remaining =
records_remaining.saturating_sub(records_count);
}
// If the fetch is not fully consumed, put it back
for the next round
if !next_fetch.is_consumed() {
self.log_fetch_buffer
.set_next_in_line_fetch(Some(next_fetch));
}
// If consumed, next_fetch will be dropped here
(which is correct)
}
}
}
Ok(())
};
match collect_result {
Ok(_) => Ok(result),
Err(e) => {
// If we have already collected records, suppress the
exception
// and return the partial results to the user first.
if result.is_empty() {
Err(e)
} else {
// Swallow the exception and return the currently
accumulated result.
// The error will likely be re-triggered during the next
poll attempt.
Ok(result)
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]