This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e4e017c  feat: support ColumnPruning (#57)
e4e017c is described below

commit e4e017c22e3f2635105f4d7dcd61cfe7b48a4768
Author: AlexZhao <[email protected]>
AuthorDate: Thu Dec 4 10:58:56 2025 +0800

    feat: support ColumnPruning (#57)
---
 crates/examples/Cargo.toml               |   2 -
 crates/fluss/src/client/table/scanner.rs | 115 +++++++++++++--
 crates/fluss/src/record/arrow.rs         | 239 ++++++++++++++++++++++++++-----
 crates/fluss/tests/integration/table.rs  |  87 ++++++++++-
 4 files changed, 390 insertions(+), 53 deletions(-)

diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index 82d864f..dab85b6 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -27,8 +27,6 @@ version = { workspace = true }
 fluss = { workspace = true }
 tokio = { workspace = true }
 clap = { workspace = true}
-
-
 [[example]]
 name = "example-table"
 path = "src/example_table.rs"
\ No newline at end of file
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index e1ab59f..13372ef 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -17,12 +17,13 @@
 
 use crate::client::connection::FlussConnection;
 use crate::client::metadata::Metadata;
-use crate::error::Result;
+use crate::error::{Error, Result};
 use crate::metadata::{TableBucket, TableInfo, TablePath};
 use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, 
PbFetchLogReqForTable};
 use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords, 
to_arrow_schema};
 use crate::rpc::RpcClient;
 use crate::util::FairBucketStatusMap;
+use arrow_schema::SchemaRef;
 use parking_lot::RwLock;
 use std::collections::HashMap;
 use std::slice::from_ref;
@@ -39,6 +40,8 @@ pub struct TableScan<'a> {
     conn: &'a FlussConnection,
     table_info: TableInfo,
     metadata: Arc<Metadata>,
+    /// Column indices to project. None means all columns, Some(vec) means 
only the specified columns (non-empty).
+    projected_fields: Option<Vec<usize>>,
 }
 
 impl<'a> TableScan<'a> {
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
             conn,
             table_info,
             metadata,
+            projected_fields: None,
         }
     }
 
-    pub fn create_log_scanner(&self) -> LogScanner {
+    /// Projects the scan to only include specified columns by their indices.
+    ///
+    /// # Arguments
+    /// * `column_indices` - Zero-based indices of columns to include in the 
scan
+    ///
+    /// # Errors
+    /// Returns an error if `column_indices` is empty or if any column index 
is out of range.
+    ///
+    /// # Example
+    /// ```
+    /// let scanner = table.new_scan().project(&[0, 2, 
3])?.create_log_scanner();
+    /// ```
+    pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+        if column_indices.is_empty() {
+            return Err(Error::IllegalArgument(
+                "Column indices cannot be empty".to_string(),
+            ));
+        }
+        let field_count = self.table_info.row_type().fields().len();
+        for &idx in column_indices {
+            if idx >= field_count {
+                return Err(Error::IllegalArgument(format!(
+                    "Column index {} out of range (max: {})",
+                    idx,
+                    field_count - 1
+                )));
+            }
+        }
+        self.projected_fields = Some(column_indices.to_vec());
+        Ok(self)
+    }
+
+    /// Projects the scan to only include specified columns by their names.
+    ///
+    /// # Arguments
+    /// * `column_names` - Names of columns to include in the scan
+    ///
+    /// # Errors
+    /// Returns an error if `column_names` is empty or if any column name is 
not found in the table schema.
+    ///
+    /// # Example
+    /// ```
+    /// let scanner = table.new_scan().project_by_name(&["col1", 
"col3"])?.create_log_scanner();
+    /// ```
+    pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
+        if column_names.is_empty() {
+            return Err(Error::IllegalArgument(
+                "Column names cannot be empty".to_string(),
+            ));
+        }
+        let row_type = self.table_info.row_type();
+        let mut indices = Vec::new();
+
+        for name in column_names {
+            let idx = row_type
+                .fields()
+                .iter()
+                .position(|f| f.name() == *name)
+                .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}' 
not found")))?;
+            indices.push(idx);
+        }
+
+        self.projected_fields = Some(indices);
+        Ok(self)
+    }
+
+    pub fn create_log_scanner(self) -> LogScanner {
         LogScanner::new(
             &self.table_info,
             self.metadata.clone(),
             self.conn.get_connections(),
+            self.projected_fields,
         )
     }
 }
@@ -72,6 +143,7 @@ impl LogScanner {
         table_info: &TableInfo,
         metadata: Arc<Metadata>,
         connections: Arc<RpcClient>,
+        projected_fields: Option<Vec<usize>>,
     ) -> Self {
         let log_scanner_status = Arc::new(LogScannerStatus::new());
         Self {
@@ -84,6 +156,7 @@ impl LogScanner {
                 connections.clone(),
                 metadata.clone(),
                 log_scanner_status.clone(),
+                projected_fields,
             ),
         }
     }
@@ -114,6 +187,7 @@ struct LogFetcher {
     table_info: TableInfo,
     metadata: Arc<Metadata>,
     log_scanner_status: Arc<LogScannerStatus>,
+    read_context: ReadContext,
 }
 
 impl LogFetcher {
@@ -122,13 +196,27 @@ impl LogFetcher {
         conns: Arc<RpcClient>,
         metadata: Arc<Metadata>,
         log_scanner_status: Arc<LogScannerStatus>,
+        projected_fields: Option<Vec<usize>>,
     ) -> Self {
+        let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
+        let read_context = Self::create_read_context(full_arrow_schema, 
projected_fields);
         LogFetcher {
             table_path: table_info.table_path.clone(),
-            conns: conns.clone(),
-            table_info: table_info.clone(),
-            metadata: metadata.clone(),
-            log_scanner_status: log_scanner_status.clone(),
+            conns,
+            table_info,
+            metadata,
+            log_scanner_status,
+            read_context,
+        }
+    }
+
+    fn create_read_context(
+        full_arrow_schema: SchemaRef,
+        projected_fields: Option<Vec<usize>>,
+    ) -> ReadContext {
+        match projected_fields {
+            None => ReadContext::new(full_arrow_schema),
+            Some(fields) => 
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
         }
     }
 
@@ -149,7 +237,7 @@ impl LogFetcher {
             for pb_fetch_log_resp in fetch_response.tables_resp {
                 let table_id = pb_fetch_log_resp.table_id;
                 let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
-                let arrow_schema = 
to_arrow_schema(self.table_info.get_row_type());
+
                 for fetch_log_for_bucket in fetch_log_for_buckets {
                     let mut fetch_records = vec![];
                     let bucket: i32 = fetch_log_for_bucket.bucket_id;
@@ -158,8 +246,7 @@ impl LogFetcher {
                         let data = fetch_log_for_bucket.records.unwrap();
                         for log_record in &mut LogRecordsBatchs::new(&data) {
                             let last_offset = log_record.last_log_offset();
-                            fetch_records
-                                
.extend(log_record.records(ReadContext::new(arrow_schema.clone())));
+                            
fetch_records.extend(log_record.records(&self.read_context)?);
                             self.log_scanner_status
                                 .update_offset(&table_bucket, last_offset + 1);
                         }
@@ -209,13 +296,19 @@ impl LogFetcher {
         if ready_for_fetch_count == 0 {
             HashMap::new()
         } else {
+            let (projection_enabled, projected_fields) =
+                match self.read_context.project_fields_in_order() {
+                    None => (false, vec![]),
+                    Some(fields) => (true, fields.iter().map(|&i| i as 
i32).collect()),
+                };
+
             fetch_log_req_for_buckets
                 .into_iter()
                 .map(|(leader_id, feq_for_buckets)| {
                     let req_for_table = PbFetchLogReqForTable {
                         table_id: table_id.unwrap(),
-                        projection_pushdown_enabled: false,
-                        projected_fields: vec![],
+                        projection_pushdown_enabled: projection_enabled,
+                        projected_fields: projected_fields.clone(),
                         buckets_req: feq_for_buckets,
                     };
 
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 487f50c..29bfe41 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -27,7 +27,12 @@ use arrow::array::{
 };
 use arrow::{
     array::RecordBatch,
-    ipc::{reader::StreamReader, writer::StreamWriter},
+    buffer::Buffer,
+    ipc::{
+        reader::{StreamReader, read_record_batch},
+        root_as_message,
+        writer::StreamWriter,
+    },
 };
 use arrow_schema::SchemaRef;
 use arrow_schema::{DataType as ArrowDataType, Field};
@@ -472,41 +477,84 @@ impl<'a> LogRecordBatch<'a> {
         LittleEndian::read_i32(&self.data[offset..offset + 
RECORDS_COUNT_LENGTH])
     }
 
-    pub fn records(&self, read_context: ReadContext) -> LogRecordIterator {
-        let count = self.record_count();
-        if count == 0 {
-            return LogRecordIterator::empty();
+    pub fn records(&self, read_context: &ReadContext) -> 
Result<LogRecordIterator> {
+        if self.record_count() == 0 {
+            return Ok(LogRecordIterator::empty());
         }
 
-        // get arrow_metadata
-        let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap();
-        // arrow_batch_data
         let data = &self.data[RECORDS_OFFSET..];
 
-        // need to combine arrow_metadata_bytes + arrow_batch_data
-        let cursor = Cursor::new([&arrow_metadata_bytes, data].concat());
-        let mut stream_reader = StreamReader::try_new(cursor, None).unwrap();
-
-        let mut record_batch = None;
-        if let Some(bath) = stream_reader.next() {
-            record_batch = Some(bath.unwrap());
-        }
-
-        if record_batch.is_none() {
-            return LogRecordIterator::empty();
-        }
-
-        let arrow_reader = ArrowReader::new(Arc::new(record_batch.unwrap()));
-        LogRecordIterator::Arrow(ArrowLogRecordIterator {
-            reader: arrow_reader,
-            base_offset: self.base_log_offset(),
-            timestamp: self.commit_timestamp(),
-            row_id: 0,
-            change_type: ChangeType::AppendOnly,
-        })
+        let record_batch = read_context.record_batch(data)?;
+        let log_record_iterator = match record_batch {
+            None => LogRecordIterator::empty(),
+            Some(record_batch) => {
+                let arrow_reader = ArrowReader::new(Arc::new(record_batch));
+                LogRecordIterator::Arrow(ArrowLogRecordIterator {
+                    reader: arrow_reader,
+                    base_offset: self.base_log_offset(),
+                    timestamp: self.commit_timestamp(),
+                    row_id: 0,
+                    change_type: ChangeType::AppendOnly,
+                })
+            }
+        };
+        Ok(log_record_iterator)
     }
 }
 
+/// Parse an Arrow IPC message from a byte slice.
+///
+/// Server returns RecordBatch message (without Schema message) in the 
encapsulated message format.
+/// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 
bytes][RecordBatch metadata][body]
+///
+/// This format is documented at:
+/// 
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
+///
+/// # Arguments
+/// * `data` - The byte slice containing the IPC message.
+///
+/// # Returns
+/// Returns `Some((batch_metadata, body_buffer, version))` on success:
+/// - `batch_metadata`: The RecordBatch metadata from the IPC message.
+/// - `body_buffer`: The buffer containing the record batch body data.
+/// - `version`: The Arrow IPC metadata version.
+///
+/// Returns `None` if the data is malformed or too short.
+fn parse_ipc_message(
+    data: &[u8],
+) -> Option<(
+    arrow::ipc::RecordBatch<'_>,
+    Buffer,
+    arrow::ipc::MetadataVersion,
+)> {
+    const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
+
+    if data.len() < 8 {
+        return None;
+    }
+
+    let continuation = LittleEndian::read_u32(&data[0..4]);
+    let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize;
+
+    if continuation != CONTINUATION_MARKER {
+        return None;
+    }
+
+    if data.len() < 8 + metadata_size {
+        return None;
+    }
+
+    let metadata_bytes = &data[8..8 + metadata_size];
+    let message = root_as_message(metadata_bytes).ok()?;
+    let batch_metadata = message.header_as_record_batch()?;
+
+    let body_start = 8 + metadata_size;
+    let body_data = &data[body_start..];
+    let body_buffer = Buffer::from(body_data);
+
+    Some((batch_metadata, body_buffer, message.version()))
+}
+
 pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef {
     match &fluss_schema {
         DataType::Row(row_type) => {
@@ -554,19 +602,140 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
     }
 }
 
+#[derive(Clone)]
 pub struct ReadContext {
-    arrow_schema: SchemaRef,
+    target_schema: SchemaRef,
+
+    projection: Option<Projection>,
+}
+
+#[derive(Clone)]
+struct Projection {
+    ordered_schema: SchemaRef,
+    projected_fields: Vec<usize>,
+    ordered_fields: Vec<usize>,
+
+    reordering_indexes: Vec<usize>,
+    reordering_needed: bool,
 }
 
 impl ReadContext {
     pub fn new(arrow_schema: SchemaRef) -> ReadContext {
-        ReadContext { arrow_schema }
+        ReadContext {
+            target_schema: arrow_schema,
+            projection: None,
+        }
     }
 
-    pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
-        let mut arrow_schema_bytes = vec![];
-        let _writer = StreamWriter::try_new(&mut arrow_schema_bytes, 
&self.arrow_schema)?;
-        Ok(arrow_schema_bytes)
+    pub fn with_projection_pushdown(
+        arrow_schema: SchemaRef,
+        projected_fields: Vec<usize>,
+    ) -> ReadContext {
+        let target_schema = Self::project_schema(arrow_schema.clone(), 
projected_fields.as_slice());
+        let mut sorted_fields = projected_fields.clone();
+        sorted_fields.sort_unstable();
+
+        let project = {
+            if !sorted_fields.eq(&projected_fields) {
+                // reordering is required
+                // Calculate reordering indexes to transform from sorted order 
to user-requested order
+                let mut reordering_indexes = 
Vec::with_capacity(projected_fields.len());
+                for &original_idx in &projected_fields {
+                    let pos = sorted_fields
+                        .binary_search(&original_idx)
+                        .expect("projection index should exist in sorted 
list");
+                    reordering_indexes.push(pos);
+                }
+                Projection {
+                    ordered_schema: Self::project_schema(
+                        arrow_schema.clone(),
+                        sorted_fields.as_slice(),
+                    ),
+                    projected_fields,
+                    ordered_fields: sorted_fields,
+                    reordering_indexes,
+                    reordering_needed: true,
+                }
+            } else {
+                Projection {
+                    ordered_schema: Self::project_schema(arrow_schema, 
projected_fields.as_slice()),
+                    ordered_fields: projected_fields.clone(),
+                    projected_fields,
+                    reordering_indexes: vec![],
+                    reordering_needed: false,
+                }
+            }
+        };
+
+        ReadContext {
+            target_schema,
+            projection: Some(project),
+        }
+    }
+
+    pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> 
SchemaRef {
+        // todo: handle the exception
+        SchemaRef::new(
+            schema
+                .project(projected_fields)
+                .expect("can't project schema"),
+        )
+    }
+
+    pub fn project_fields(&self) -> Option<&[usize]> {
+        self.projection
+            .as_ref()
+            .map(|p| p.projected_fields.as_slice())
+    }
+
+    pub fn project_fields_in_order(&self) -> Option<&[usize]> {
+        self.projection
+            .as_ref()
+            .map(|p| p.ordered_fields.as_slice())
+    }
+
+    pub fn record_batch(&self, data: &[u8]) -> Result<Option<RecordBatch>> {
+        let (batch_metadata, body_buffer, version) = match 
parse_ipc_message(data) {
+            Some(result) => result,
+            None => return Ok(None),
+        };
+
+        // the record batch from server must be ordered by field pos,
+        // according to project to decide what arrow schema to use
+        // to parse the record batch
+        let resolve_schema = match self.projection {
+            Some(ref projection) => {
+                // projection, should use ordered schema by project field pos
+                projection.ordered_schema.clone()
+            }
+            None => {
+                // no projection, use target output schema
+                self.target_schema.clone()
+            }
+        };
+
+        let record_batch = read_record_batch(
+            &body_buffer,
+            batch_metadata,
+            resolve_schema,
+            &std::collections::HashMap::new(),
+            None,
+            &version,
+        )?;
+
+        let record_batch = match &self.projection {
+            Some(projection) if projection.reordering_needed => {
+                // Reorder columns if needed (when projection pushdown with 
non-sorted order)
+                let reordered_columns: Vec<_> = projection
+                    .reordering_indexes
+                    .iter()
+                    .map(|&idx| record_batch.column(idx).clone())
+                    .collect();
+                RecordBatch::try_new(self.target_schema.clone(), 
reordered_columns)?
+            }
+            _ => record_batch,
+        };
+        Ok(Some(record_batch))
     }
 }
 
diff --git a/crates/fluss/tests/integration/table.rs 
b/crates/fluss/tests/integration/table.rs
index aa02724..e14b852 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -54,6 +54,7 @@ mod table_test {
         })
         .join()
         .expect("Failed to create cluster");
+
         // wait for 20 seconds to avoid the error like
         // CoordinatorEventProcessor is not initialized yet
         thread::sleep(std::time::Duration::from_secs(20));
@@ -84,14 +85,16 @@ mod table_test {
     }
 
     #[tokio::test]
-    async fn append_record_batch() {
+    async fn append_record_batch_and_scan() {
         let cluster = get_fluss_cluster();
         let connection = cluster.get_fluss_connection().await;
 
         let admin = connection.get_admin().await.expect("Failed to get admin");
 
-        let table_path =
-            TablePath::new("fluss".to_string(), 
"test_append_record_batch".to_string());
+        let table_path = TablePath::new(
+            "fluss".to_string(),
+            "test_append_record_batch_and_scan".to_string(),
+        );
 
         let table_descriptor = TableDescriptor::builder()
             .schema(
@@ -101,15 +104,18 @@ mod table_test {
                     .build()
                     .expect("Failed to build schema"),
             )
+            .property("table.log.arrow.compression.type", "NONE")
             .build()
             .expect("Failed to build table");
 
         create_table(&admin, &table_path, &table_descriptor).await;
 
-        let append_writer = connection
+        let table = connection
             .get_table(&table_path)
             .await
-            .expect("Failed to get table")
+            .expect("Failed to get table");
+
+        let append_writer = table
             .new_append()
             .expect("Failed to create append")
             .create_writer();
@@ -128,6 +134,77 @@ mod table_test {
             .await
             .expect("Failed to append batch");
 
+        append_writer.flush().await.expect("Failed to flush");
+
+        let num_buckets = table.table_info().get_num_buckets();
+        let log_scanner = table.new_scan().create_log_scanner();
+        for bucket_id in 0..num_buckets {
+            log_scanner
+                .subscribe(bucket_id, 0)
+                .await
+                .expect("Failed to subscribe");
+        }
+
+        let scan_records = log_scanner
+            .poll(std::time::Duration::from_secs(5))
+            .await
+            .expect("Failed to poll");
+
+        let mut records: Vec<_> = scan_records.into_iter().collect();
+        records.sort_by_key(|r| r.offset());
+
+        assert_eq!(records.len(), 6, "Should have 6 records");
+        for (i, record) in records.iter().enumerate() {
+            let row = record.row();
+            let expected_c1 = (i + 1) as i32;
+            let expected_c2 = format!("a{}", i + 1);
+            assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", 
i);
+            assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index 
{}", i);
+        }
+
+        let log_scanner_projected = table
+            .new_scan()
+            .project(&[1, 0])
+            .expect("Failed to project")
+            .create_log_scanner();
+        for bucket_id in 0..num_buckets {
+            log_scanner_projected
+                .subscribe(bucket_id, 0)
+                .await
+                .expect("Failed to subscribe");
+        }
+
+        let scan_records_projected = log_scanner_projected
+            .poll(std::time::Duration::from_secs(5))
+            .await
+            .expect("Failed to poll");
+
+        let mut records_projected: Vec<_> = 
scan_records_projected.into_iter().collect();
+        records_projected.sort_by_key(|r| r.offset());
+
+        assert_eq!(
+            records_projected.len(),
+            6,
+            "Should have 6 records with projection"
+        );
+        for (i, record) in records_projected.iter().enumerate() {
+            let row = record.row();
+            let expected_c2 = format!("a{}", i + 1);
+            let expected_c1 = (i + 1) as i32;
+            assert_eq!(
+                row.get_string(0),
+                expected_c2,
+                "Projected c2 (first column) mismatch at index {}",
+                i
+            );
+            assert_eq!(
+                row.get_int(1),
+                expected_c1,
+                "Projected c1 (second column) mismatch at index {}",
+                i
+            );
+        }
+
         // Create scanner to verify appended records
         let table = connection
             .get_table(&table_path)

Reply via email to