Copilot commented on code in PR #57:
URL: https://github.com/apache/fluss-rust/pull/57#discussion_r2553954669


##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -209,13 +273,23 @@ impl LogFetcher {
         if ready_for_fetch_count == 0 {
             HashMap::new()
         } else {
+            let (projection_enabled, projected_fields) = if let Some(fields) = 
&self.projected_fields {
+                if fields.is_empty() {
+                    (false, vec![])
+                } else {

Review Comment:
   Converting `usize` to `i32` using `as i32` could lead to data loss or 
incorrect values on 64-bit systems where `usize` can exceed `i32::MAX`. This 
could cause issues when projecting columns with very high indices. Consider 
either:
   1. Using a bounded type for column indices (e.g., validating they're within 
i32 range)
   2. Changing the protobuf field type to support larger indices
   3. Adding an explicit check and error if the index exceeds i32::MAX
   ```suggestion
                   } else {
                       // Check for out-of-range indices before converting
                       if let Some(&idx) = fields.iter().find(|&&i| i > 
i32::MAX as usize) {
                           // Return early with error if any index is too large
                           return HashMap::new(); // Or, if possible, return 
Err(Error::msg(...))
                       }
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -505,6 +521,46 @@ impl<'a> LogRecordBatch<'a> {
             change_type: ChangeType::AppendOnly,
         })
     }
+
+    /// Parse an Arrow IPC message from a byte slice.
+    ///
+    /// Server returns RecordBatch message (without Schema message) in the 
encapsulated message format.
+    /// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 
bytes][RecordBatch metadata][body]
+    ///
+    /// This format is documented at:
+    /// 
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
+    ///
+    /// Returns the RecordBatch metadata, body buffer, and metadata version.
+    fn parse_ipc_message(
+        data: &'a [u8],
+    ) -> Option<(arrow::ipc::RecordBatch<'a>, Buffer, 
arrow::ipc::MetadataVersion)> {
+        const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
+        
+        if data.len() < 8 {
+            return None;
+        }
+        
+        let continuation = u32::from_le_bytes([data[0], data[1], data[2], 
data[3]]);
+        let metadata_size = u32::from_le_bytes([data[4], data[5], data[6], 
data[7]]) as usize;
+        
+        if continuation != CONTINUATION_MARKER {
+            return None;
+        }
+        

Review Comment:
   [nitpick] Trailing whitespace detected. Please remove it to maintain code 
cleanliness.
   ```suggestion
   
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -149,7 +184,36 @@ impl LogFetcher {
             for pb_fetch_log_resp in fetch_response.tables_resp {
                 let table_id = pb_fetch_log_resp.table_id;
                 let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-                let arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+                
+                let full_arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+                let read_context = if let Some(projected_fields) = 
&self.projected_fields {
+                    let (projection_enabled, _) = if 
!projected_fields.is_empty() {
+                        (true, projected_fields.clone())
+                    } else {
+                        (false, vec![])
+                    };
+                    
+                    if projection_enabled {
+                        let projected_schema = arrow_schema::Schema::new(
+                            projected_fields
+                                .iter()
+                                .map(|&idx| 
full_arrow_schema.field(idx).clone())
+                                .collect::<Vec<_>>(),
+                        );
+                        ReadContext::with_projection_pushdown(
+                            Arc::new(projected_schema),
+                            Some(projected_fields.clone()),
+                        )
+                    } else {
+                        ReadContext::with_projection(
+                            full_arrow_schema,
+                            self.projected_fields.clone(),
+                        )
+                    }
+                } else {
+                    ReadContext::new(full_arrow_schema)
+                };

Review Comment:
   The logic for determining whether projection is enabled is duplicated and 
overly complex. The condition `if !projected_fields.is_empty()` is checked to 
set `projection_enabled`, but then the same logic is repeated in lines 276-284 
in `prepare_fetch_log_requests`. Consider extracting this logic into a helper 
method or simplifying the nested conditions. Additionally, the variable `_` on 
line 191 is unused and should be removed.



##########
crates/examples/src/example_projection.rs:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use clap::Parser;
+use fluss::client::FlussConnection;
+use fluss::config::Config;
+use fluss::error::Result;
+use fluss::metadata::{DataTypes, LogFormat, Schema, TableDescriptor, 
TablePath};
+use fluss::row::{GenericRow, InternalRow};
+use std::time::Duration;
+
+#[tokio::main]
+pub async fn main() -> Result<()> {
+    env_logger::init();
+    

Review Comment:
   [nitpick] Trailing whitespace detected on this line. Please remove it to 
maintain code cleanliness.
   ```suggestion
   
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -505,6 +521,46 @@ impl<'a> LogRecordBatch<'a> {
             change_type: ChangeType::AppendOnly,
         })
     }
+
+    /// Parse an Arrow IPC message from a byte slice.
+    ///
+    /// Server returns RecordBatch message (without Schema message) in the 
encapsulated message format.
+    /// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 
bytes][RecordBatch metadata][body]
+    ///
+    /// This format is documented at:
+    /// 
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
+    ///
+    /// Returns the RecordBatch metadata, body buffer, and metadata version.
+    fn parse_ipc_message(
+        data: &'a [u8],
+    ) -> Option<(arrow::ipc::RecordBatch<'a>, Buffer, 
arrow::ipc::MetadataVersion)> {
+        const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
+        

Review Comment:
   [nitpick] Trailing whitespace detected. Please remove it to maintain code 
cleanliness.
   ```suggestion
   
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -505,6 +521,46 @@ impl<'a> LogRecordBatch<'a> {
             change_type: ChangeType::AppendOnly,
         })
     }
+
+    /// Parse an Arrow IPC message from a byte slice.
+    ///
+    /// Server returns RecordBatch message (without Schema message) in the 
encapsulated message format.
+    /// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 
bytes][RecordBatch metadata][body]
+    ///
+    /// This format is documented at:
+    /// 
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
+    ///
+    /// Returns the RecordBatch metadata, body buffer, and metadata version.
+    fn parse_ipc_message(
+        data: &'a [u8],
+    ) -> Option<(arrow::ipc::RecordBatch<'a>, Buffer, 
arrow::ipc::MetadataVersion)> {
+        const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
+        
+        if data.len() < 8 {
+            return None;
+        }
+        

Review Comment:
   [nitpick] Trailing whitespace detected. Please remove it to maintain code 
cleanliness.
   ```suggestion
   
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -149,7 +184,36 @@ impl LogFetcher {
             for pb_fetch_log_resp in fetch_response.tables_resp {
                 let table_id = pb_fetch_log_resp.table_id;
                 let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-                let arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+                
+                let full_arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+                let read_context = if let Some(projected_fields) = 
&self.projected_fields {
+                    let (projection_enabled, _) = if 
!projected_fields.is_empty() {
+                        (true, projected_fields.clone())
+                    } else {
+                        (false, vec![])
+                    };
+                    
+                    if projection_enabled {
+                        let projected_schema = arrow_schema::Schema::new(
+                            projected_fields
+                                .iter()
+                                .map(|&idx| 
full_arrow_schema.field(idx).clone())
+                                .collect::<Vec<_>>(),
+                        );

Review Comment:
   Similar to the bounds checking issue in ReadContext::to_arrow_metadata, 
there's no validation when creating the projected schema here. If 
`projected_fields` contains an invalid index, calling 
`full_arrow_schema.field(idx)` will panic. While `TableScan::project` validates 
indices, this code path could still be reached with invalid indices if the 
projection is set up through other means.



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +48,43 @@ impl<'a> TableScan<'a> {
             conn,
             table_info,
             metadata,
+            projected_fields: None,
         }
     }
 
-    pub fn create_log_scanner(&self) -> LogScanner {
+    pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+        let field_count = self.table_info.row_type().fields().len();
+        for &idx in column_indices {
+            if idx >= field_count {
+                return Err(Error::IllegalArgument(format!("Column index {} out 
of range (max: {})", idx, field_count - 1)));
+            }
+        }
+        self.projected_fields = Some(column_indices.to_vec());
+        Ok(self)
+    }
+    
+    pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
+        let row_type = self.table_info.row_type();
+        let mut indices = Vec::new();
+        
+        for name in column_names {
+            let idx = row_type.fields()
+                .iter()
+                .position(|f| f.name() == *name)
+                .ok_or_else(|| Error::IllegalArgument(format!("Column '{}' not 
found", name)))?;
+            indices.push(idx);
+        }
+        
+        self.projected_fields = Some(indices);
+        Ok(self)
+    }

Review Comment:
   Public API methods `project` and `project_by_name` lack documentation. 
Consider adding doc comments explaining their purpose, parameters, return 
values, and providing usage examples. For instance:
   ```rust
   /// Projects the scan to only include specified columns by their indices.
   ///
   /// # Arguments
   /// * `column_indices` - Zero-based indices of columns to include in the scan
   ///
   /// # Errors
   /// Returns an error if any column index is out of range.
   ///
   /// # Example
   /// ```
   /// let scanner = table.new_scan().project(&[0, 2, 3])?.create_log_scanner();
   /// ```
   pub fn project(mut self, column_indices: &[usize]) -> Result<Self>
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -473,30 +479,40 @@ impl<'a> LogRecordBatch<'a> {
     }
 
     pub fn records(&self, read_context: ReadContext) -> LogRecordIterator {
-        let count = self.record_count();
-        if count == 0 {
+        if self.record_count() == 0 {
             return LogRecordIterator::empty();
         }
 
-        // get arrow_metadata
-        let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap();
-        // arrow_batch_data
         let data = &self.data[RECORDS_OFFSET..];
+        let (batch_metadata, body_buffer, version) = match 
Self::parse_ipc_message(data) {
+            Some(result) => result,
+            None => return LogRecordIterator::empty(),
+        };
 
-        // need to combine arrow_metadata_bytes + arrow_batch_data
-        let cursor = Cursor::new([&arrow_metadata_bytes, data].concat());
-        let mut stream_reader = StreamReader::try_new(cursor, None).unwrap();
-
-        let mut record_batch = None;
-        if let Some(bath) = stream_reader.next() {
-            record_batch = Some(bath.unwrap());
-        }
+        let (schema_to_use, projection) = if 
read_context.is_projection_pushdown() {
+            (read_context.arrow_schema.clone(), None)
+        } else if let Some(projected_fields) = read_context.projected_fields() 
{
+            (read_context.arrow_schema.clone(), Some(projected_fields))
+        } else {
+            (read_context.arrow_schema.clone(), None)
+        };

Review Comment:
   [nitpick] The logic in lines 492-498 can be simplified to just:
   ```rust
   let projection = if read_context.is_projection_pushdown() {
       None
   } else {
       read_context.projected_fields().map(|f| f.to_vec())
   };
   let schema_to_use = read_context.arrow_schema.clone();
   ```
   This achieves the same result with clearer intent.
   ```suggestion
           let projection = if read_context.is_projection_pushdown() {
               None
           } else {
               read_context.projected_fields().map(|f| f.to_vec())
           };
           let schema_to_use = read_context.arrow_schema.clone();
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,20 +610,62 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
     }
 }
 
+#[derive(Clone)]
 pub struct ReadContext {
     arrow_schema: SchemaRef,
+    projected_fields: Option<Vec<usize>>,
+    projection_pushdown: bool,
 }
 
 impl ReadContext {
     pub fn new(arrow_schema: SchemaRef) -> ReadContext {
-        ReadContext { arrow_schema }
+        ReadContext {
+            arrow_schema,
+            projected_fields: None,
+            projection_pushdown: false,
+        }
+    }
+
+    pub fn with_projection(arrow_schema: SchemaRef, projected_fields: 
Option<Vec<usize>>) -> ReadContext {
+        ReadContext {
+            arrow_schema,
+            projected_fields,
+            projection_pushdown: false,
+        }
+    }
+
+    pub fn with_projection_pushdown(arrow_schema: SchemaRef, projected_fields: 
Option<Vec<usize>>) -> ReadContext {
+        ReadContext {
+            arrow_schema,
+            projected_fields,
+            projection_pushdown: true,
+        }
+    }
+
+    pub fn is_projection_pushdown(&self) -> bool {
+        self.projection_pushdown
     }
 
     pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
         let mut arrow_schema_bytes = vec![];
-        let _writer = StreamWriter::try_new(&mut arrow_schema_bytes, 
&self.arrow_schema)?;
+        let schema_to_use = if let Some(projected_fields) = 
&self.projected_fields {
+            let projected_schema = arrow_schema::Schema::new(
+                projected_fields
+                    .iter()
+                    .map(|&idx| self.arrow_schema.field(idx).clone())
+                    .collect::<Vec<_>>(),

Review Comment:
   There's no bounds checking when accessing `self.arrow_schema.field(idx)`. If 
`projected_fields` contains invalid indices (greater than or equal to the 
schema's field count), this will panic. While validation exists in 
`TableScan::project` (lines 57-60 in scanner.rs), the `ReadContext` can be 
constructed directly with `with_projection` or `with_projection_pushdown` 
methods, bypassing this validation. Consider adding bounds checking here or 
documenting the precondition that indices must be valid.



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -149,7 +184,36 @@ impl LogFetcher {
             for pb_fetch_log_resp in fetch_response.tables_resp {
                 let table_id = pb_fetch_log_resp.table_id;
                 let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-                let arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+                
+                let full_arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+                let read_context = if let Some(projected_fields) = 
&self.projected_fields {
+                    let (projection_enabled, _) = if 
!projected_fields.is_empty() {
+                        (true, projected_fields.clone())
+                    } else {
+                        (false, vec![])
+                    };
+                    
+                    if projection_enabled {
+                        let projected_schema = arrow_schema::Schema::new(
+                            projected_fields
+                                .iter()
+                                .map(|&idx| 
full_arrow_schema.field(idx).clone())
+                                .collect::<Vec<_>>(),
+                        );

Review Comment:
   This code for building a projected schema is duplicated from the logic in 
`ReadContext::to_arrow_metadata` (lines 652-658 in arrow.rs). This duplication 
creates a maintenance burden and increases the risk of inconsistencies. 
Consider extracting this logic into a shared helper method.
   ```suggestion
                           let projected_schema = 
build_projected_schema(&full_arrow_schema, &projected_fields);
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -149,7 +184,36 @@ impl LogFetcher {
             for pb_fetch_log_resp in fetch_response.tables_resp {
                 let table_id = pb_fetch_log_resp.table_id;
                 let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-                let arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+                
+                let full_arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+                let read_context = if let Some(projected_fields) = 
&self.projected_fields {
+                    let (projection_enabled, _) = if 
!projected_fields.is_empty() {
+                        (true, projected_fields.clone())
+                    } else {
+                        (false, vec![])
+                    };

Review Comment:
   [nitpick] The check for empty projected fields (line 190) and subsequent 
handling creates unnecessary complexity. If `projected_fields` is 
`Some(vec![])` (an empty vector), it should probably be treated the same as 
`None` - i.e., fetch all fields. The current logic sets `projection_enabled = 
false` for empty vectors, but it's unclear why an empty projection list would 
ever be intentionally created. Consider either preventing empty vectors from 
being set in the first place (e.g., in the `project` method) or simplifying 
this logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to