luoyuxia commented on code in PR #57:
URL: https://github.com/apache/fluss-rust/pull/57#discussion_r2554011553


##########
crates/examples/src/example_projection.rs:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use clap::Parser;
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, 
TablePath};
+use fluss::row::{GenericRow, InternalRow};
+use std::time::Duration;
+
+#[tokio::main]
+pub async fn main() -> Result<()> {

Review Comment:
   maybe remove remote the `example_projection.rs` and verify projection work 
in `table_test#append_record_batch`?  Rename `append_record_batch` to 
`append_record_batch_and_scan`?
   If we want to provide projection example, we can add. projection code in 
`example_table`



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,20 +609,62 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
     }
 }
 
+#[derive(Clone)]
 pub struct ReadContext {
     arrow_schema: SchemaRef,
+    projected_fields: Option<Vec<usize>>,
+    projection_pushdown: bool,
 }
 
 impl ReadContext {
     pub fn new(arrow_schema: SchemaRef) -> ReadContext {
-        ReadContext { arrow_schema }
+        ReadContext {
+            arrow_schema,
+            projected_fields: None,
+            projection_pushdown: false,
+        }
+    }
+
+    pub fn with_projection(arrow_schema: SchemaRef, projected_fields: 
Option<Vec<usize>>) -> ReadContext {
+        ReadContext {
+            arrow_schema,
+            projected_fields,
+            projection_pushdown: false,
+        }
+    }
+
+    pub fn with_projection_pushdown(arrow_schema: SchemaRef, projected_fields: 
Option<Vec<usize>>) -> ReadContext {
+        ReadContext {
+            arrow_schema,
+            projected_fields,
+            projection_pushdown: true,
+        }
+    }
+
+    pub fn is_projection_pushdown(&self) -> bool {
+        self.projection_pushdown
     }
 
     pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {

Review Comment:
   this method can be removed



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -209,13 +264,23 @@ impl LogFetcher {
         if ready_for_fetch_count == 0 {
             HashMap::new()
         } else {
+            let (projection_enabled, projected_fields) = if let Some(fields) = 
&self.projected_fields {

Review Comment:
   Suggest do like this:
   ```
   let (projection_enabled, projected_fields) = match 
self.read_context.projected_fields() {
                   None => {
                       (false, vec![])
                   }
                   Some(fields) => {
                       (true, fields.iter().map(|&i| i as i32).collect())
                   }
               };
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -473,30 +479,39 @@ impl<'a> LogRecordBatch<'a> {
     }
 
     pub fn records(&self, read_context: ReadContext) -> LogRecordIterator {
-        let count = self.record_count();
-        if count == 0 {
+        if self.record_count() == 0 {
             return LogRecordIterator::empty();
         }
 
-        // get arrow_metadata
-        let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap();
-        // arrow_batch_data
         let data = &self.data[RECORDS_OFFSET..];
+        let (batch_metadata, body_buffer, version) = match 
Self::parse_ipc_message(data) {
+            Some(result) => result,
+            None => return LogRecordIterator::empty(),
+        };
 
-        // need to combine arrow_metadata_bytes + arrow_batch_data
-        let cursor = Cursor::new([&arrow_metadata_bytes, data].concat());
-        let mut stream_reader = StreamReader::try_new(cursor, None).unwrap();
-
-        let mut record_batch = None;
-        if let Some(bath) = stream_reader.next() {
-            record_batch = Some(bath.unwrap());
-        }
-
-        if record_batch.is_none() {
-            return LogRecordIterator::empty();
-        }
+        let projection = if read_context.is_projection_pushdown() {
+            None
+        } else {
+            read_context.projected_fields()
+        };
+        let schema_to_use = read_context.arrow_schema.clone();
+
+        let record_batch = match read_record_batch(
+            &body_buffer,
+            batch_metadata,
+            schema_to_use,
+            &std::collections::HashMap::new(),
+            projection,
+            &version,
+        ) {
+            Ok(batch) => batch,
+            Err(e) => {

Review Comment:
   make `records` to return `Result<LogRecordIterator>` for better error handle.
   ```
   pub fn records(&self, read_context: &ReadContext) -> 
Result<LogRecordIterator> {
           if self.record_count() == 0 {
               return Ok(LogRecordIterator::empty());
           }
   
           let data = &self.data[RECORDS_OFFSET..];
           let (batch_metadata, body_buffer, version) = match 
Self::parse_ipc_message(data) {
               Some(result) => result,
               None => return Ok(LogRecordIterator::empty()),
           };
   
           let projection = if read_context.is_projection_pushdown() {
               None
           } else {
               read_context.projected_fields()
           };
           let schema_to_use = read_context.arrow_schema.clone();
   
           let record_batch = read_record_batch(
               &body_buffer,
               batch_metadata,
               schema_to_use,
               &std::collections::HashMap::new(),
               projection,
               &version,
           )?;
   
           let arrow_reader = ArrowReader::new(Arc::new(record_batch));
           Ok(LogRecordIterator::Arrow(ArrowLogRecordIterator {
               reader: arrow_reader,
               base_offset: self.base_log_offset(),
               timestamp: self.commit_timestamp(),
               row_id: 0,
               change_type: ChangeType::AppendOnly,
           }))
       }
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +48,43 @@ impl<'a> TableScan<'a> {
             conn,
             table_info,
             metadata,
+            projected_fields: None,
         }
     }
 
-    pub fn create_log_scanner(&self) -> LogScanner {
+    pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+        let field_count = self.table_info.row_type().fields().len();
+        for &idx in column_indices {
+            if idx >= field_count {
+                return Err(Error::IllegalArgument(format!("Column index {} out 
of range (max: {})", idx, field_count - 1)));
+            }
+        }
+        self.projected_fields = Some(column_indices.to_vec());
+        Ok(self)
+    }
+    
+    pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
+        let row_type = self.table_info.row_type();
+        let mut indices = Vec::new();
+        
+        for name in column_names {
+            let idx = row_type.fields()
+                .iter()
+                .position(|f| f.name() == *name)
+                .ok_or_else(|| Error::IllegalArgument(format!("Column '{}' not 
found", name)))?;
+            indices.push(idx);
+        }
+        
+        self.projected_fields = Some(indices);
+        Ok(self)
+    }

Review Comment:
   +1 for advice of copilot



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -473,30 +479,39 @@ impl<'a> LogRecordBatch<'a> {
     }
 
     pub fn records(&self, read_context: ReadContext) -> LogRecordIterator {
-        let count = self.record_count();
-        if count == 0 {
+        if self.record_count() == 0 {
             return LogRecordIterator::empty();
         }
 
-        // get arrow_metadata
-        let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap();
-        // arrow_batch_data
         let data = &self.data[RECORDS_OFFSET..];
+        let (batch_metadata, body_buffer, version) = match 
Self::parse_ipc_message(data) {
+            Some(result) => result,
+            None => return LogRecordIterator::empty(),
+        };
 
-        // need to combine arrow_metadata_bytes + arrow_batch_data
-        let cursor = Cursor::new([&arrow_metadata_bytes, data].concat());
-        let mut stream_reader = StreamReader::try_new(cursor, None).unwrap();
-
-        let mut record_batch = None;
-        if let Some(bath) = stream_reader.next() {
-            record_batch = Some(bath.unwrap());
-        }
-
-        if record_batch.is_none() {
-            return LogRecordIterator::empty();
-        }
+        let projection = if read_context.is_projection_pushdown() {

Review Comment:
   IIUC, the projection will always be None right?
   If so, I think we can remove the if/else statement. 



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -159,7 +214,7 @@ impl LogFetcher {
                         for log_record in &mut LogRecordsBatchs::new(&data) {
                             let last_offset = log_record.last_log_offset();
                             fetch_records
-                                
.extend(log_record.records(ReadContext::new(arrow_schema.clone())));
+                                
.extend(log_record.records(read_context.clone()));

Review Comment:
   then, we can use 
   ```
   fetch_records
                                   
.extend(log_record.records(&self.read_context))
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -149,7 +184,27 @@ impl LogFetcher {
             for pb_fetch_log_resp in fetch_response.tables_resp {
                 let table_id = pb_fetch_log_resp.table_id;
                 let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-                let arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+                
+                let full_arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+                let read_context = if self.is_projection_enabled() {

Review Comment:
   we don't need construct  read_context in each fetch, we can create it when 
create logFetcher. Suggest to do like this:
   ```
   struct LogFetcher {
       table_path: TablePath,
       conns: Arc<RpcClient>,
       table_info: TableInfo,
       metadata: Arc<Metadata>,
       log_scanner_status: Arc<LogScannerStatus>,
       read_context: ReadContext,
   }
   ```
   
   ```
   impl LogFetcher {
       pub fn new(
           table_info: TableInfo,
           conns: Arc<RpcClient>,
           metadata: Arc<Metadata>,
           log_scanner_status: Arc<LogScannerStatus>,
           projected_fields: Option<Vec<usize>>,
       ) -> Self {
           LogFetcher {
               table_path: table_info.table_path.clone(),
               conns: conns.clone(),
               table_info: table_info.clone(),
               metadata: metadata.clone(),
               log_scanner_status: log_scanner_status.clone(),
               read_context: 
Self::create_read_context(to_arrow_schema(table_info.get_row_type()), 
projected_fields),
           }
       }
       
       fn create_read_context(table_arrow_schema: SchemaRef, projected_fields: 
Option<Vec<usize>>) -> ReadContext {
           match projected_fields {
               None => {
                   ReadContext::new(
                       table_arrow_schema,
                       None,
                   )
               }
               Some(projected_fields) => {
                   let projected_schema = arrow_schema::Schema::new(
                       projected_fields
                           .iter()
                           .map(|&idx| table_arrow_schema.field(idx).clone())
                           .collect::<Vec<_>>(),
                   );
                   ReadContext::new(
                       Arc::new(projected_schema),
                       Some(projected_fields.clone()),
                   )
               }
           }
       }
   }
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -232,6 +297,13 @@ impl LogFetcher {
         }
     }
 
+    fn is_projection_enabled(&self) -> bool {
+        self.projected_fields

Review Comment:
   we can force it's projection_enabled when projected_fields is not none to 
make the logic simple? Throw exception in `pub fn project(mut self, 
column_indices: &[usize])` when column_indices is empty.



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -505,6 +520,46 @@ impl<'a> LogRecordBatch<'a> {
             change_type: ChangeType::AppendOnly,
         })
     }
+
+    /// Parse an Arrow IPC message from a byte slice.
+    ///
+    /// Server returns RecordBatch message (without Schema message) in the 
encapsulated message format.
+    /// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 
bytes][RecordBatch metadata][body]
+    ///
+    /// This format is documented at:
+    /// 
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
+    ///
+    /// Returns the RecordBatch metadata, body buffer, and metadata version.
+    fn parse_ipc_message(
+        data: &'a [u8],
+    ) -> Option<(arrow::ipc::RecordBatch<'a>, Buffer, 
arrow::ipc::MetadataVersion)> {
+        const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
+        
+        if data.len() < 8 {

Review Comment:
   IIUC, `if data.len() < 8 {` , `continuation != CONTINUATION_MARKER`, `if 
data.len() < 8 + metadata_size` should all be unexpected, right?
   I think we should return exception instead of return None.
   
   You can do  return exception in this pr or another pr(but rememeber to left 
todo).



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,20 +609,62 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
     }
 }
 
+#[derive(Clone)]
 pub struct ReadContext {

Review Comment:
   maybe we can change this?
   ```
   #[derive(Clone)]
   pub struct ReadContext {
       arrow_schema: SchemaRef,
       projected_fields: Option<Vec<usize>>,
   }
   
   impl ReadContext {
       pub fn new(arrow_schema: SchemaRef, projected_fields: 
Option<Vec<usize>>) -> ReadContext {
           ReadContext {
               arrow_schema,
               projected_fields,
           }
       }
       
       pub fn is_projection_pushdown(&self) -> bool {
           self.projected_fields.is_some()
       }
   
       pub fn projected_fields(&self) -> Option<&[usize]> {
           self.projected_fields.as_deref()
       }
   }
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -505,6 +520,46 @@ impl<'a> LogRecordBatch<'a> {
             change_type: ChangeType::AppendOnly,
         })
     }
+
+    /// Parse an Arrow IPC message from a byte slice.
+    ///
+    /// Server returns RecordBatch message (without Schema message) in the 
encapsulated message format.
+    /// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 
bytes][RecordBatch metadata][body]
+    ///
+    /// This format is documented at:
+    /// 
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
+    ///
+    /// Returns the RecordBatch metadata, body buffer, and metadata version.
+    fn parse_ipc_message(
+        data: &'a [u8],
+    ) -> Option<(arrow::ipc::RecordBatch<'a>, Buffer, 
arrow::ipc::MetadataVersion)> {
+        const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
+        
+        if data.len() < 8 {
+            return None;
+        }
+        
+        let continuation = u32::from_le_bytes([data[0], data[1], data[2], 
data[3]]);

Review Comment:
   nit:
   ```suggestion
           let continuation = LittleEndian::read_u32(&data[0..4]);
           let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize;
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -39,6 +39,7 @@ pub struct TableScan<'a> {
     conn: &'a FlussConnection,
     table_info: TableInfo,
     metadata: Arc<Metadata>,
+    projected_fields: Option<Vec<usize>>,

Review Comment:
   nit: add comment for `projected_fields`?



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -232,6 +297,13 @@ impl LogFetcher {
         }
     }
 
+    fn is_projection_enabled(&self) -> bool {
+        self.projected_fields

Review Comment:
   but seems this method can be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to