Copilot commented on code in PR #57:
URL: https://github.com/apache/fluss-rust/pull/57#discussion_r2585871798


##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -122,13 +196,29 @@ impl LogFetcher {
         conns: Arc<RpcClient>,
         metadata: Arc<Metadata>,
         log_scanner_status: Arc<LogScannerStatus>,
+        projected_fields: Option<Vec<usize>>,
     ) -> Self {
+        let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
+        let read_context = Self::create_read_context(full_arrow_schema, 
projected_fields);
         LogFetcher {
             table_path: table_info.table_path.clone(),
-            conns: conns.clone(),
-            table_info: table_info.clone(),
-            metadata: metadata.clone(),
-            log_scanner_status: log_scanner_status.clone(),
+            conns,
+            table_info,
+            metadata,
+            log_scanner_status,
+            read_context,
+        }
+    }
+
+    fn create_read_context(
+        full_arrow_schema: SchemaRef,
+        projected_fields: Option<Vec<usize>>,
+    ) -> ReadContext {
+        match projected_fields {
+            None => ReadContext::new(full_arrow_schema),
+            Some(fields) => {
+                
ReadContext::with_projection_pushdown(full_arrow_schema.clone(), fields)

Review Comment:
   [nitpick] The `clone()` here is unnecessary since `full_arrow_schema` is 
already a `SchemaRef` (which is an `Arc<Schema>`), and cloning an Arc just 
increments the reference count. Consider removing the `.clone()`:
   ```rust
   Some(fields) => ReadContext::with_projection_pushdown(full_arrow_schema, 
fields)
   ```
   ```suggestion
                   ReadContext::with_projection_pushdown(full_arrow_schema, 
fields)
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +589,135 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
     }
 }
 
+#[derive(Clone)]
 pub struct ReadContext {
-    arrow_schema: SchemaRef,
+    target_schema: SchemaRef,
+
+    projection: Option<Projection>,
+}
+
+#[derive(Clone)]
+struct Projection {
+    ordered_schema: SchemaRef,
+    projected_fields: Vec<usize>,
+
+    reordering_indexes: Vec<usize>,
+    reordering_needed: bool,
 }
 
 impl ReadContext {
     pub fn new(arrow_schema: SchemaRef) -> ReadContext {
-        ReadContext { arrow_schema }
+        ReadContext {
+            target_schema: arrow_schema,
+            projection: None,
+        }
     }
 
-    pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
-        let mut arrow_schema_bytes = vec![];
-        let _writer = StreamWriter::try_new(&mut arrow_schema_bytes, 
&self.arrow_schema)?;
-        Ok(arrow_schema_bytes)
+    pub fn with_projection_pushdown(
+        arrow_schema: SchemaRef,
+        projected_fields: Vec<usize>,
+    ) -> ReadContext {
+        let target_schema = Self::project_schema(arrow_schema.clone(), 
projected_fields.as_slice());
+        let mut sorted_fields = projected_fields.clone();
+        sorted_fields.sort_unstable();
+
+        let project = {
+            if !sorted_fields.eq(&projected_fields) {
+                // reordering is required
+                // Calculate reordering indexes to transform from sorted order 
to user-requested order
+                let mut reordering_indexes = 
Vec::with_capacity(projected_fields.len());
+                for &original_idx in &projected_fields {
+                    let pos = sorted_fields
+                        .binary_search(&original_idx)
+                        .expect("projection index should exist in sorted 
list");
+                    reordering_indexes.push(pos);
+                }
+                Projection {
+                    ordered_schema: Self::project_schema(
+                        arrow_schema.clone(),
+                        sorted_fields.as_slice(),
+                    ),
+                    projected_fields,
+                    reordering_indexes,
+                    reordering_needed: true,
+                }
+            } else {
+                Projection {
+                    ordered_schema: Self::project_schema(arrow_schema, 
projected_fields.as_slice()),
+                    projected_fields,
+                    reordering_indexes: vec![],
+                    reordering_needed: false,
+                }
+            }
+        };
+
+        ReadContext {
+            target_schema,
+            projection: Some(project),
+        }
+    }
+
+    pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> 
SchemaRef {
+        // todo: handle the exception

Review Comment:
   [nitpick] The TODO comment "todo: handle the exception" should be either 
implemented or converted to a more descriptive comment explaining what 
exception handling is needed and when it will be addressed. 
   
   If schema projection can fail here (which the `expect` suggests), this 
should be handled properly as mentioned in a related comment about this code 
section.



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
             conn,
             table_info,
             metadata,
+            projected_fields: None,
         }
     }
 
-    pub fn create_log_scanner(&self) -> LogScanner {
+    /// Projects the scan to only include specified columns by their indices.
+    ///
+    /// # Arguments
+    /// * `column_indices` - Zero-based indices of columns to include in the 
scan
+    ///
+    /// # Errors
+    /// Returns an error if `column_indices` is empty or if any column index 
is out of range.
+    ///
+    /// # Example
+    /// ```
+    /// let scanner = table.new_scan().project(&[0, 2, 
3])?.create_log_scanner();
+    /// ```
+    pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+        if column_indices.is_empty() {
+            return Err(Error::IllegalArgument(
+                "Column indices cannot be empty".to_string(),
+            ));
+        }
+        let field_count = self.table_info.row_type().fields().len();
+        for &idx in column_indices {
+            if idx >= field_count {
+                return Err(Error::IllegalArgument(format!(
+                    "Column index {} out of range (max: {})",
+                    idx,
+                    field_count - 1
+                )));
+            }
+        }
+        self.projected_fields = Some(column_indices.to_vec());
+        Ok(self)
+    }
+
+    /// Projects the scan to only include specified columns by their names.
+    ///
+    /// # Arguments
+    /// * `column_names` - Names of columns to include in the scan
+    ///
+    /// # Errors
+    /// Returns an error if `column_names` is empty or if any column name is 
not found in the table schema.
+    ///
+    /// # Example
+    /// ```
+    /// let scanner = table.new_scan().project_by_name(&["col1", 
"col3"])?.create_log_scanner();
+    /// ```

Review Comment:
   The documentation examples in both `project` and `project_by_name` methods 
are not compilable Rust doc tests - they're missing the necessary setup code 
(like `table`) and will fail if run. Consider either:
   1. Adding `no_run` to prevent execution: `/// # Example\n/// ```no_run`
   2. Providing complete examples with proper context
   
   Example for `project`:
   ```rust
   /// # Example
   /// ```no_run
   /// # use fluss::client::FlussConnection;
   /// # async fn example() -> fluss::error::Result<()> {
   /// # let conn = FlussConnection::new(config).await?;
   /// # let table = conn.get_table(&table_path).await?;
   /// let scanner = table.new_scan().project(&[0, 2, 3])?.create_log_scanner();
   /// # Ok(())
   /// # }
   /// ```
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +589,135 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
     }
 }
 
+#[derive(Clone)]
 pub struct ReadContext {
-    arrow_schema: SchemaRef,
+    target_schema: SchemaRef,
+
+    projection: Option<Projection>,
+}
+
+#[derive(Clone)]
+struct Projection {
+    ordered_schema: SchemaRef,
+    projected_fields: Vec<usize>,
+
+    reordering_indexes: Vec<usize>,
+    reordering_needed: bool,
 }
 
 impl ReadContext {
     pub fn new(arrow_schema: SchemaRef) -> ReadContext {
-        ReadContext { arrow_schema }
+        ReadContext {
+            target_schema: arrow_schema,
+            projection: None,
+        }
     }
 
-    pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
-        let mut arrow_schema_bytes = vec![];
-        let _writer = StreamWriter::try_new(&mut arrow_schema_bytes, 
&self.arrow_schema)?;
-        Ok(arrow_schema_bytes)
+    pub fn with_projection_pushdown(
+        arrow_schema: SchemaRef,
+        projected_fields: Vec<usize>,
+    ) -> ReadContext {
+        let target_schema = Self::project_schema(arrow_schema.clone(), 
projected_fields.as_slice());
+        let mut sorted_fields = projected_fields.clone();
+        sorted_fields.sort_unstable();
+
+        let project = {
+            if !sorted_fields.eq(&projected_fields) {
+                // reordering is required
+                // Calculate reordering indexes to transform from sorted order 
to user-requested order
+                let mut reordering_indexes = 
Vec::with_capacity(projected_fields.len());
+                for &original_idx in &projected_fields {
+                    let pos = sorted_fields
+                        .binary_search(&original_idx)
+                        .expect("projection index should exist in sorted 
list");
+                    reordering_indexes.push(pos);
+                }
+                Projection {
+                    ordered_schema: Self::project_schema(
+                        arrow_schema.clone(),
+                        sorted_fields.as_slice(),
+                    ),
+                    projected_fields,
+                    reordering_indexes,
+                    reordering_needed: true,
+                }
+            } else {
+                Projection {
+                    ordered_schema: Self::project_schema(arrow_schema, 
projected_fields.as_slice()),
+                    projected_fields,
+                    reordering_indexes: vec![],
+                    reordering_needed: false,
+                }
+            }
+        };
+
+        ReadContext {
+            target_schema,
+            projection: Some(project),
+        }
+    }
+
+    pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> 
SchemaRef {
+        // todo: handle the exception
+        SchemaRef::new(
+            schema
+                .project(projected_fields)
+                .expect("can't project schema"),
+        )
+    }
+
+    pub fn is_projection_pushdown(&self) -> bool {
+        self.projection.is_some()
+    }
+
+    pub fn project_fields(&self) -> Option<&[usize]> {
+        self.projection
+            .as_ref()
+            .map(|p| p.projected_fields.as_slice())
+    }
+
+    pub fn record_batch(&self, data: &[u8]) -> Result<Option<RecordBatch>> {
+        let (batch_metadata, body_buffer, version) = match 
parse_ipc_message(data) {
+            Some(result) => result,
+            None => return Ok(None),
+        };
+
+        // the record batch from server must be ordered by field pos
+        // according to project to decide what arrow schema to use
+        // to parse the record batch
+        let resolve_schema = match self.projection {
+            Some(ref projection) => {
+                // projection, should use ordered schema by project field pos
+                projection.ordered_schema.clone()
+            }
+            None => {
+                // no projection, use target output schema
+                self.target_schema.clone()
+            }
+        };
+
+        let record_batch = read_record_batch(
+            &body_buffer,
+            batch_metadata,
+            resolve_schema,
+            &std::collections::HashMap::new(),
+            None,
+            &version,
+        )?;
+
+        let record_batch = match &self.projection {
+            Some(projection) if projection.reordering_needed => {
+                // Reorder columns if needed (when projection pushdown with 
non-sorted order
+                let reordered_columns: Vec<_> = projection
+                    .reordering_indexes
+                    .iter()
+                    .map(|&idx| record_batch.column(idx).clone())
+                    .collect();
+                RecordBatch::try_new(record_batch.schema(), reordered_columns)?

Review Comment:
   When `reordering_needed` is true, the RecordBatch is recreated using the 
original `record_batch.schema()` instead of `self.target_schema`. This means 
the reordered batch will have the sorted schema rather than the target schema 
that matches the user-requested column order. This could lead to schema 
mismatches.
   
   Consider using `self.target_schema.clone()` instead:
   ```rust
   RecordBatch::try_new(self.target_schema.clone(), reordered_columns)?
   ```
   ```suggestion
                   RecordBatch::try_new(self.target_schema.clone(), 
reordered_columns)?
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -472,39 +477,69 @@ impl<'a> LogRecordBatch<'a> {
         LittleEndian::read_i32(&self.data[offset..offset + 
RECORDS_COUNT_LENGTH])
     }
 
-    pub fn records(&self, read_context: ReadContext) -> LogRecordIterator {
-        let count = self.record_count();
-        if count == 0 {
-            return LogRecordIterator::empty();
+    pub fn records(&self, read_context: &ReadContext) -> 
Result<LogRecordIterator> {
+        if self.record_count() == 0 {
+            return Ok(LogRecordIterator::empty());
         }
 
-        // get arrow_metadata
-        let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap();
-        // arrow_batch_data
         let data = &self.data[RECORDS_OFFSET..];
 
-        // need to combine arrow_metadata_bytes + arrow_batch_data
-        let cursor = Cursor::new([&arrow_metadata_bytes, data].concat());
-        let mut stream_reader = StreamReader::try_new(cursor, None).unwrap();
+        let record_batch = read_context.record_batch(data)?;
+        let log_record_iterator = match record_batch {
+            None => LogRecordIterator::empty(),
+            Some(record_batch) => {
+                let arrow_reader = ArrowReader::new(Arc::new(record_batch));
+                LogRecordIterator::Arrow(ArrowLogRecordIterator {
+                    reader: arrow_reader,
+                    base_offset: self.base_log_offset(),
+                    timestamp: self.commit_timestamp(),
+                    row_id: 0,
+                    change_type: ChangeType::AppendOnly,
+                })
+            }
+        };
+        Ok(log_record_iterator)
+    }
+}
 
-        let mut record_batch = None;
-        if let Some(bath) = stream_reader.next() {
-            record_batch = Some(bath.unwrap());
-        }
+/// Parse an Arrow IPC message from a byte slice.
+///
+/// Server returns RecordBatch message (without Schema message) in the 
encapsulated message format.
+/// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 
bytes][RecordBatch metadata][body]
+///
+/// This format is documented at:
+/// 
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
+///
+/// Returns the RecordBatch metadata, body buffer, and metadata version.

Review Comment:
   [nitpick] Missing documentation for the `parse_ipc_message` function 
parameters and return value. The function has good documentation about the 
format, but should also document what each parameter means and what the return 
tuple contains.
   
   Consider adding:
   ```rust
   /// # Arguments
   /// * `data` - The byte slice containing the IPC message
   ///
   /// # Returns
   /// Returns `Some((batch_metadata, body_buffer, version))` on success:
   /// - `batch_metadata`: The RecordBatch metadata from the IPC message
   /// - `body_buffer`: The buffer containing the record batch body data
   /// - `version`: The Arrow IPC metadata version
   ///
   /// Returns `None` if the data is malformed or too short.
   ```
   ```suggestion
   /// # Arguments
   /// * `data` - The byte slice containing the IPC message.
   ///
   /// # Returns
   /// Returns `Some((batch_metadata, body_buffer, version))` on success:
   /// - `batch_metadata`: The RecordBatch metadata from the IPC message.
   /// - `body_buffer`: The buffer containing the record batch body data.
   /// - `version`: The Arrow IPC metadata version.
   ///
   /// Returns `None` if the data is malformed or too short.
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
             conn,
             table_info,
             metadata,
+            projected_fields: None,
         }
     }
 
-    pub fn create_log_scanner(&self) -> LogScanner {
+    /// Projects the scan to only include specified columns by their indices.
+    ///
+    /// # Arguments
+    /// * `column_indices` - Zero-based indices of columns to include in the 
scan
+    ///
+    /// # Errors
+    /// Returns an error if `column_indices` is empty or if any column index 
is out of range.
+    ///
+    /// # Example
+    /// ```
+    /// let scanner = table.new_scan().project(&[0, 2, 
3])?.create_log_scanner();
+    /// ```
+    pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+        if column_indices.is_empty() {
+            return Err(Error::IllegalArgument(
+                "Column indices cannot be empty".to_string(),
+            ));
+        }
+        let field_count = self.table_info.row_type().fields().len();
+        for &idx in column_indices {
+            if idx >= field_count {
+                return Err(Error::IllegalArgument(format!(
+                    "Column index {} out of range (max: {})",
+                    idx,
+                    field_count - 1
+                )));
+            }
+        }
+        self.projected_fields = Some(column_indices.to_vec());

Review Comment:
   The `project` method doesn't check for duplicate column indices. If a user 
calls `project(&[0, 0, 2])`, it will add the same column index twice to 
`projected_fields`, which could lead to unexpected behavior in projection. 
Consider adding validation to reject or de-duplicate duplicate indices.



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +589,135 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
     }
 }
 
+#[derive(Clone)]
 pub struct ReadContext {
-    arrow_schema: SchemaRef,
+    target_schema: SchemaRef,
+
+    projection: Option<Projection>,
+}
+
+#[derive(Clone)]
+struct Projection {
+    ordered_schema: SchemaRef,
+    projected_fields: Vec<usize>,
+
+    reordering_indexes: Vec<usize>,
+    reordering_needed: bool,
 }
 
 impl ReadContext {
     pub fn new(arrow_schema: SchemaRef) -> ReadContext {
-        ReadContext { arrow_schema }
+        ReadContext {
+            target_schema: arrow_schema,
+            projection: None,
+        }
     }
 
-    pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
-        let mut arrow_schema_bytes = vec![];
-        let _writer = StreamWriter::try_new(&mut arrow_schema_bytes, 
&self.arrow_schema)?;
-        Ok(arrow_schema_bytes)
+    pub fn with_projection_pushdown(
+        arrow_schema: SchemaRef,
+        projected_fields: Vec<usize>,
+    ) -> ReadContext {
+        let target_schema = Self::project_schema(arrow_schema.clone(), 
projected_fields.as_slice());
+        let mut sorted_fields = projected_fields.clone();
+        sorted_fields.sort_unstable();
+
+        let project = {
+            if !sorted_fields.eq(&projected_fields) {
+                // reordering is required
+                // Calculate reordering indexes to transform from sorted order 
to user-requested order
+                let mut reordering_indexes = 
Vec::with_capacity(projected_fields.len());
+                for &original_idx in &projected_fields {
+                    let pos = sorted_fields
+                        .binary_search(&original_idx)
+                        .expect("projection index should exist in sorted 
list");
+                    reordering_indexes.push(pos);
+                }
+                Projection {
+                    ordered_schema: Self::project_schema(
+                        arrow_schema.clone(),
+                        sorted_fields.as_slice(),
+                    ),
+                    projected_fields,
+                    reordering_indexes,
+                    reordering_needed: true,
+                }
+            } else {
+                Projection {
+                    ordered_schema: Self::project_schema(arrow_schema, 
projected_fields.as_slice()),
+                    projected_fields,
+                    reordering_indexes: vec![],
+                    reordering_needed: false,
+                }
+            }
+        };
+
+        ReadContext {
+            target_schema,
+            projection: Some(project),
+        }
+    }
+
+    pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> 
SchemaRef {
+        // todo: handle the exception
+        SchemaRef::new(
+            schema
+                .project(projected_fields)
+                .expect("can't project schema"),
+        )
+    }
+
+    pub fn is_projection_pushdown(&self) -> bool {
+        self.projection.is_some()
+    }
+
+    pub fn project_fields(&self) -> Option<&[usize]> {
+        self.projection
+            .as_ref()
+            .map(|p| p.projected_fields.as_slice())
+    }
+
+    pub fn record_batch(&self, data: &[u8]) -> Result<Option<RecordBatch>> {
+        let (batch_metadata, body_buffer, version) = match 
parse_ipc_message(data) {
+            Some(result) => result,
+            None => return Ok(None),
+        };
+
+        // the record batch from server must be ordered by field pos
+        // according to project to decide what arrow schema to use
+        // to parse the record batch
+        let resolve_schema = match self.projection {
+            Some(ref projection) => {
+                // projection, should use ordered schema by project field pos
+                projection.ordered_schema.clone()
+            }
+            None => {
+                // no projection, use target output schema
+                self.target_schema.clone()
+            }
+        };
+
+        let record_batch = read_record_batch(
+            &body_buffer,
+            batch_metadata,
+            resolve_schema,
+            &std::collections::HashMap::new(),
+            None,
+            &version,
+        )?;
+
+        let record_batch = match &self.projection {
+            Some(projection) if projection.reordering_needed => {
+                // Reorder columns if needed (when projection pushdown with 
non-sorted order

Review Comment:
   The comment says "Reorder columns if needed (when projection pushdown with 
non-sorted order" but is missing a closing parenthesis. Should be:
   ```rust
   // Reorder columns if needed (when projection pushdown with non-sorted order)
   ```
   ```suggestion
                   // Reorder columns if needed (when projection pushdown with 
non-sorted order)
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
             conn,
             table_info,
             metadata,
+            projected_fields: None,
         }
     }
 
-    pub fn create_log_scanner(&self) -> LogScanner {
+    /// Projects the scan to only include specified columns by their indices.
+    ///
+    /// # Arguments
+    /// * `column_indices` - Zero-based indices of columns to include in the 
scan
+    ///
+    /// # Errors
+    /// Returns an error if `column_indices` is empty or if any column index 
is out of range.
+    ///
+    /// # Example
+    /// ```
+    /// let scanner = table.new_scan().project(&[0, 2, 
3])?.create_log_scanner();
+    /// ```
+    pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+        if column_indices.is_empty() {
+            return Err(Error::IllegalArgument(
+                "Column indices cannot be empty".to_string(),
+            ));
+        }
+        let field_count = self.table_info.row_type().fields().len();
+        for &idx in column_indices {
+            if idx >= field_count {
+                return Err(Error::IllegalArgument(format!(
+                    "Column index {} out of range (max: {})",
+                    idx,
+                    field_count - 1
+                )));
+            }
+        }
+        self.projected_fields = Some(column_indices.to_vec());
+        Ok(self)
+    }
+
+    /// Projects the scan to only include specified columns by their names.
+    ///
+    /// # Arguments
+    /// * `column_names` - Names of columns to include in the scan
+    ///
+    /// # Errors
+    /// Returns an error if `column_names` is empty or if any column name is 
not found in the table schema.
+    ///
+    /// # Example
+    /// ```
+    /// let scanner = table.new_scan().project_by_name(&["col1", 
"col3"])?.create_log_scanner();
+    /// ```
+    pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
+        if column_names.is_empty() {
+            return Err(Error::IllegalArgument(
+                "Column names cannot be empty".to_string(),
+            ));
+        }
+        let row_type = self.table_info.row_type();
+        let mut indices = Vec::new();
+
+        for name in column_names {
+            let idx = row_type
+                .fields()
+                .iter()
+                .position(|f| f.name() == *name)
+                .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}' 
not found")))?;
+            indices.push(idx);
+        }

Review Comment:
   The `project_by_name` method doesn't check for duplicate column names. If a 
user calls `project_by_name(&["col1", "col1"])`, it will add the same column 
index twice to `indices`, which could lead to unexpected behavior in 
projection. Consider adding validation to reject duplicate column names or 
de-duplicate them.



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +589,135 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
     }
 }
 
+#[derive(Clone)]
 pub struct ReadContext {
-    arrow_schema: SchemaRef,
+    target_schema: SchemaRef,
+
+    projection: Option<Projection>,
+}
+
+#[derive(Clone)]
+struct Projection {
+    ordered_schema: SchemaRef,
+    projected_fields: Vec<usize>,
+
+    reordering_indexes: Vec<usize>,
+    reordering_needed: bool,
 }
 
 impl ReadContext {
     pub fn new(arrow_schema: SchemaRef) -> ReadContext {
-        ReadContext { arrow_schema }
+        ReadContext {
+            target_schema: arrow_schema,
+            projection: None,
+        }
     }
 
-    pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
-        let mut arrow_schema_bytes = vec![];
-        let _writer = StreamWriter::try_new(&mut arrow_schema_bytes, 
&self.arrow_schema)?;
-        Ok(arrow_schema_bytes)
+    pub fn with_projection_pushdown(
+        arrow_schema: SchemaRef,
+        projected_fields: Vec<usize>,
+    ) -> ReadContext {
+        let target_schema = Self::project_schema(arrow_schema.clone(), 
projected_fields.as_slice());
+        let mut sorted_fields = projected_fields.clone();
+        sorted_fields.sort_unstable();
+
+        let project = {
+            if !sorted_fields.eq(&projected_fields) {
+                // reordering is required
+                // Calculate reordering indexes to transform from sorted order 
to user-requested order
+                let mut reordering_indexes = 
Vec::with_capacity(projected_fields.len());
+                for &original_idx in &projected_fields {
+                    let pos = sorted_fields
+                        .binary_search(&original_idx)
+                        .expect("projection index should exist in sorted 
list");
+                    reordering_indexes.push(pos);
+                }
+                Projection {
+                    ordered_schema: Self::project_schema(
+                        arrow_schema.clone(),
+                        sorted_fields.as_slice(),
+                    ),
+                    projected_fields,
+                    reordering_indexes,
+                    reordering_needed: true,
+                }
+            } else {
+                Projection {
+                    ordered_schema: Self::project_schema(arrow_schema, 
projected_fields.as_slice()),
+                    projected_fields,
+                    reordering_indexes: vec![],
+                    reordering_needed: false,
+                }
+            }
+        };
+
+        ReadContext {
+            target_schema,
+            projection: Some(project),
+        }
+    }
+
+    pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> 
SchemaRef {
+        // todo: handle the exception
+        SchemaRef::new(
+            schema
+                .project(projected_fields)
+                .expect("can't project schema"),

Review Comment:
   The `expect` call will panic if schema projection fails. Since this is 
handling user-provided projection indices (which are already validated in 
`TableScan::project`), the panic risk is low but could still occur due to 
internal inconsistencies. Consider returning a proper error with context about 
which indices failed:
   ```rust
   SchemaRef::new(
       schema
           .project(projected_fields)
           .map_err(|e| Error::IllegalArgument(format!("Failed to project 
schema: {}", e)))?,
   )
   ```



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
             conn,
             table_info,
             metadata,
+            projected_fields: None,
         }
     }
 
-    pub fn create_log_scanner(&self) -> LogScanner {
+    /// Projects the scan to only include specified columns by their indices.
+    ///
+    /// # Arguments
+    /// * `column_indices` - Zero-based indices of columns to include in the 
scan
+    ///
+    /// # Errors
+    /// Returns an error if `column_indices` is empty or if any column index 
is out of range.
+    ///
+    /// # Example
+    /// ```
+    /// let scanner = table.new_scan().project(&[0, 2, 
3])?.create_log_scanner();
+    /// ```
+    pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+        if column_indices.is_empty() {
+            return Err(Error::IllegalArgument(
+                "Column indices cannot be empty".to_string(),
+            ));
+        }
+        let field_count = self.table_info.row_type().fields().len();
+        for &idx in column_indices {
+            if idx >= field_count {
+                return Err(Error::IllegalArgument(format!(
+                    "Column index {} out of range (max: {})",
+                    idx,
+                    field_count - 1
+                )));
+            }
+        }
+        self.projected_fields = Some(column_indices.to_vec());
+        Ok(self)
+    }
+
+    /// Projects the scan to only include specified columns by their names.
+    ///
+    /// # Arguments
+    /// * `column_names` - Names of columns to include in the scan
+    ///
+    /// # Errors
+    /// Returns an error if `column_names` is empty or if any column name is 
not found in the table schema.
+    ///
+    /// # Example
+    /// ```
+    /// let scanner = table.new_scan().project_by_name(&["col1", 
"col3"])?.create_log_scanner();
+    /// ```
+    pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
+        if column_names.is_empty() {
+            return Err(Error::IllegalArgument(
+                "Column names cannot be empty".to_string(),
+            ));
+        }
+        let row_type = self.table_info.row_type();
+        let mut indices = Vec::new();
+
+        for name in column_names {
+            let idx = row_type
+                .fields()
+                .iter()
+                .position(|f| f.name() == *name)
+                .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}' 
not found")))?;
+            indices.push(idx);
+        }
+
+        self.projected_fields = Some(indices);
+        Ok(self)
+    }

Review Comment:
   The new column projection functionality lacks test coverage. There are no 
tests for:
   - `TableScan::project()` method
   - `TableScan::project_by_name()` method  
   - `ReadContext::with_projection_pushdown()` 
   - The projection reordering logic in `ReadContext::record_batch()`
   - Edge cases like duplicate columns, out-of-order projections, etc.
   
   Since this repository has integration tests (see 
`crates/fluss/tests/integration/table.rs`), consider adding tests for these new 
features. For example:
   ```rust
   #[tokio::test]
   async fn test_column_projection() {
       // Test that projecting specific columns returns only those columns
   }
   
   #[tokio::test]
   async fn test_column_projection_by_name() {
       // Test projection by column names
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to