zhaohaidao commented on code in PR #57:
URL: https://github.com/apache/fluss-rust/pull/57#discussion_r2583898666
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -473,30 +479,39 @@ impl<'a> LogRecordBatch<'a> {
}
pub fn records(&self, read_context: ReadContext) -> LogRecordIterator {
- let count = self.record_count();
- if count == 0 {
+ if self.record_count() == 0 {
return LogRecordIterator::empty();
}
- // get arrow_metadata
- let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap();
- // arrow_batch_data
let data = &self.data[RECORDS_OFFSET..];
+ let (batch_metadata, body_buffer, version) = match
Self::parse_ipc_message(data) {
+ Some(result) => result,
+ None => return LogRecordIterator::empty(),
+ };
- // need to combine arrow_metadata_bytes + arrow_batch_data
- let cursor = Cursor::new([&arrow_metadata_bytes, data].concat());
- let mut stream_reader = StreamReader::try_new(cursor, None).unwrap();
-
- let mut record_batch = None;
- if let Some(bath) = stream_reader.next() {
- record_batch = Some(bath.unwrap());
- }
-
- if record_batch.is_none() {
- return LogRecordIterator::empty();
- }
+ let projection = if read_context.is_projection_pushdown() {
+ None
+ } else {
+ read_context.projected_fields()
+ };
+ let schema_to_use = read_context.arrow_schema.clone();
+
+ let record_batch = match read_record_batch(
+ &body_buffer,
+ batch_metadata,
+ schema_to_use,
+ &std::collections::HashMap::new(),
+ projection,
+ &version,
+ ) {
+ Ok(batch) => batch,
+ Err(e) => {
Review Comment:
make sense
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]