Copilot commented on code in PR #127:
URL: https://github.com/apache/fluss-rust/pull/127#discussion_r2668676256
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -719,6 +823,134 @@ impl LogFetcher {
}
}
+ /// Collect completed fetches as RecordBatches
+ fn collect_batches(&self) -> Result<Vec<RecordBatch>> {
+ // Limit memory usage with both batch count and byte size constraints.
+ // Max 100 batches per poll, but also check total bytes (soft cap
~64MB).
+ const MAX_BATCHES: usize = 100;
+ const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap
+ let mut result: Vec<RecordBatch> = Vec::new();
+ let mut batches_remaining = MAX_BATCHES;
+ let mut bytes_consumed: usize = 0;
+
+ while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
+ let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+
+ match next_in_line {
+ None => {
+ if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
+ if !completed_fetch.is_initialized() {
+ let size_in_bytes =
completed_fetch.size_in_bytes();
+ match self.initialize_fetch(completed_fetch) {
+ Ok(initialized) => {
+
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+ continue;
+ }
+ Err(e) => {
+ if result.is_empty() && size_in_bytes == 0
{
+ continue;
+ }
+ return Err(e);
+ }
+ }
+ } else {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(completed_fetch));
+ }
+ } else {
+ break;
+ }
+ }
+ Some(ref f) if f.is_consumed() => {
+ if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
+ if !completed_fetch.is_initialized() {
+ let size_in_bytes =
completed_fetch.size_in_bytes();
+ match self.initialize_fetch(completed_fetch) {
+ Ok(initialized) => {
+
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+ continue;
+ }
+ Err(e) => {
+ if result.is_empty() && size_in_bytes == 0
{
+ continue;
+ }
+ return Err(e);
+ }
+ }
+ } else {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(completed_fetch));
+ }
+ } else {
+ break;
+ }
+ }
+ Some(mut next_fetch) => {
+ let batches =
+ self.fetch_batches_from_fetch(&mut next_fetch,
batches_remaining)?;
+ let batch_count = batches.len();
+
+ if !batches.is_empty() {
+ // Track bytes consumed (soft cap - may exceed by one
fetch)
+ let batch_bytes: usize =
+ batches.iter().map(|b|
b.get_array_memory_size()).sum();
+ bytes_consumed += batch_bytes;
+
+ result.extend(batches);
+ batches_remaining =
batches_remaining.saturating_sub(batch_count);
+ }
+
+ if !next_fetch.is_consumed() {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(next_fetch));
+ }
+ }
+ }
+ }
+
+ Ok(result)
+ }
+
+ fn fetch_batches_from_fetch(
+ &self,
+ next_in_line_fetch: &mut Box<dyn CompletedFetch>,
+ max_batches: usize,
+ ) -> Result<Vec<RecordBatch>> {
+ let table_bucket = next_in_line_fetch.table_bucket().clone();
+ let current_offset =
self.log_scanner_status.get_bucket_offset(&table_bucket);
+
+ if current_offset.is_none() {
+ warn!(
+ "Ignoring fetched batches for {:?} since bucket was
unsubscribed",
+ table_bucket
Review Comment:
The warning messages here use different formatting than the similar messages
in `fetch_records_from_fetch` (lines 794-796 and 818-820). In
`fetch_records_from_fetch`, the messages use string interpolation syntax like
"Ignoring fetched records for {table_bucket:?}", while here the messages use
the old format macro syntax like "Ignoring fetched batches for {:?}". For
consistency within the codebase, these messages should use the same string
interpolation format.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -719,6 +823,134 @@ impl LogFetcher {
}
}
+ /// Collect completed fetches as RecordBatches
+ fn collect_batches(&self) -> Result<Vec<RecordBatch>> {
+ // Limit memory usage with both batch count and byte size constraints.
+ // Max 100 batches per poll, but also check total bytes (soft cap
~64MB).
+ const MAX_BATCHES: usize = 100;
+ const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap
+ let mut result: Vec<RecordBatch> = Vec::new();
+ let mut batches_remaining = MAX_BATCHES;
+ let mut bytes_consumed: usize = 0;
+
+ while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
+ let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+
+ match next_in_line {
+ None => {
+ if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
+ if !completed_fetch.is_initialized() {
+ let size_in_bytes =
completed_fetch.size_in_bytes();
+ match self.initialize_fetch(completed_fetch) {
+ Ok(initialized) => {
+
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+ continue;
+ }
+ Err(e) => {
+ if result.is_empty() && size_in_bytes == 0
{
+ continue;
+ }
+ return Err(e);
+ }
+ }
+ } else {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(completed_fetch));
+ }
+ } else {
+ break;
+ }
+ }
+ Some(ref f) if f.is_consumed() => {
+ if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
+ if !completed_fetch.is_initialized() {
+ let size_in_bytes =
completed_fetch.size_in_bytes();
+ match self.initialize_fetch(completed_fetch) {
+ Ok(initialized) => {
+
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+ continue;
+ }
+ Err(e) => {
+ if result.is_empty() && size_in_bytes == 0
{
+ continue;
+ }
+ return Err(e);
+ }
+ }
+ } else {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(completed_fetch));
+ }
+ } else {
+ break;
Review Comment:
The code blocks at lines 840-863 and 864-887 are nearly identical
duplicates. Both handle the case where a completed fetch needs to be polled and
initialized. Consider extracting this common logic into a helper method to
improve maintainability and reduce the risk of bugs when one branch is updated
but not the other.
```suggestion
enum LoopAction {
Break,
Continue,
None,
}
let mut handle_completed_fetch =
|result: &mut Vec<RecordBatch>| ->
Result<LoopAction, _> {
if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
if !completed_fetch.is_initialized() {
let size_in_bytes =
completed_fetch.size_in_bytes();
match
self.initialize_fetch(completed_fetch) {
Ok(initialized) => {
self.log_fetch_buffer
.set_next_in_line_fetch(initialized);
Ok(LoopAction::Continue)
}
Err(e) => {
if result.is_empty() &&
size_in_bytes == 0 {
Ok(LoopAction::Continue)
} else {
Err(e)
}
}
}
} else {
self.log_fetch_buffer
.set_next_in_line_fetch(Some(completed_fetch));
Ok(LoopAction::None)
}
} else {
Ok(LoopAction::Break)
}
};
match handle_completed_fetch(&mut result)? {
LoopAction::Break => break,
LoopAction::Continue => continue,
LoopAction::None => {}
}
}
Some(ref f) if f.is_consumed() => {
match handle_completed_fetch(&mut result)? {
LoopAction::Break => break,
LoopAction::Continue => continue,
LoopAction::None => {}
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -719,6 +823,134 @@ impl LogFetcher {
}
}
+ /// Collect completed fetches as RecordBatches
+ fn collect_batches(&self) -> Result<Vec<RecordBatch>> {
+ // Limit memory usage with both batch count and byte size constraints.
+ // Max 100 batches per poll, but also check total bytes (soft cap
~64MB).
+ const MAX_BATCHES: usize = 100;
+ const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap
+ let mut result: Vec<RecordBatch> = Vec::new();
+ let mut batches_remaining = MAX_BATCHES;
+ let mut bytes_consumed: usize = 0;
+
+ while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
+ let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
+
+ match next_in_line {
+ None => {
+ if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
+ if !completed_fetch.is_initialized() {
+ let size_in_bytes =
completed_fetch.size_in_bytes();
+ match self.initialize_fetch(completed_fetch) {
+ Ok(initialized) => {
+
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+ continue;
+ }
+ Err(e) => {
+ if result.is_empty() && size_in_bytes == 0
{
+ continue;
+ }
+ return Err(e);
+ }
+ }
+ } else {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(completed_fetch));
+ }
+ } else {
+ break;
+ }
+ }
+ Some(ref f) if f.is_consumed() => {
+ if let Some(completed_fetch) =
self.log_fetch_buffer.poll() {
+ if !completed_fetch.is_initialized() {
+ let size_in_bytes =
completed_fetch.size_in_bytes();
+ match self.initialize_fetch(completed_fetch) {
+ Ok(initialized) => {
+
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
+ continue;
+ }
+ Err(e) => {
+ if result.is_empty() && size_in_bytes == 0
{
+ continue;
+ }
+ return Err(e);
+ }
+ }
+ } else {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(completed_fetch));
+ }
+ } else {
+ break;
+ }
+ }
+ Some(mut next_fetch) => {
+ let batches =
+ self.fetch_batches_from_fetch(&mut next_fetch,
batches_remaining)?;
+ let batch_count = batches.len();
+
+ if !batches.is_empty() {
+ // Track bytes consumed (soft cap - may exceed by one
fetch)
+ let batch_bytes: usize =
+ batches.iter().map(|b|
b.get_array_memory_size()).sum();
+ bytes_consumed += batch_bytes;
+
+ result.extend(batches);
+ batches_remaining =
batches_remaining.saturating_sub(batch_count);
+ }
+
+ if !next_fetch.is_consumed() {
+ self.log_fetch_buffer
+ .set_next_in_line_fetch(Some(next_fetch));
+ }
+ }
+ }
+ }
+
+ Ok(result)
+ }
+
+ fn fetch_batches_from_fetch(
+ &self,
+ next_in_line_fetch: &mut Box<dyn CompletedFetch>,
+ max_batches: usize,
+ ) -> Result<Vec<RecordBatch>> {
+ let table_bucket = next_in_line_fetch.table_bucket().clone();
+ let current_offset =
self.log_scanner_status.get_bucket_offset(&table_bucket);
+
+ if current_offset.is_none() {
+ warn!(
+ "Ignoring fetched batches for {:?} since bucket was
unsubscribed",
+ table_bucket
+ );
+ next_in_line_fetch.drain();
+ return Ok(Vec::new());
+ }
+
+ let current_offset = current_offset.unwrap();
+ let fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+ if fetch_offset == current_offset {
+ let batches = next_in_line_fetch.fetch_batches(max_batches)?;
+ let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
+
+ if next_fetch_offset > current_offset {
+ self.log_scanner_status
+ .update_offset(&table_bucket, next_fetch_offset);
+ }
+
+ Ok(batches)
+ } else {
+ warn!(
+ "Ignoring fetched batches for {:?} at offset {}, expected {}",
+ table_bucket, fetch_offset, current_offset
+ );
Review Comment:
The warning messages here use different formatting than the similar messages
in `fetch_records_from_fetch` (lines 794-796 and 818-820). The
`fetch_records_from_fetch` method uses string interpolation format like
"Ignoring fetched records for {table_bucket:?}", while these messages use the
old format macro syntax. For consistency within the codebase, these messages
should use the same string interpolation format.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]