This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e4e017c feat: support ColumnPruning (#57)
e4e017c is described below
commit e4e017c22e3f2635105f4d7dcd61cfe7b48a4768
Author: AlexZhao <[email protected]>
AuthorDate: Thu Dec 4 10:58:56 2025 +0800
feat: support ColumnPruning (#57)
---
crates/examples/Cargo.toml | 2 -
crates/fluss/src/client/table/scanner.rs | 115 +++++++++++++--
crates/fluss/src/record/arrow.rs | 239 ++++++++++++++++++++++++++-----
crates/fluss/tests/integration/table.rs | 87 ++++++++++-
4 files changed, 390 insertions(+), 53 deletions(-)
diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index 82d864f..dab85b6 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -27,8 +27,6 @@ version = { workspace = true }
fluss = { workspace = true }
tokio = { workspace = true }
clap = { workspace = true}
-
-
[[example]]
name = "example-table"
path = "src/example_table.rs"
\ No newline at end of file
diff --git a/crates/fluss/src/client/table/scanner.rs
b/crates/fluss/src/client/table/scanner.rs
index e1ab59f..13372ef 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -17,12 +17,13 @@
use crate::client::connection::FlussConnection;
use crate::client::metadata::Metadata;
-use crate::error::Result;
+use crate::error::{Error, Result};
use crate::metadata::{TableBucket, TableInfo, TablePath};
use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket,
PbFetchLogReqForTable};
use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord, ScanRecords,
to_arrow_schema};
use crate::rpc::RpcClient;
use crate::util::FairBucketStatusMap;
+use arrow_schema::SchemaRef;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::slice::from_ref;
@@ -39,6 +40,8 @@ pub struct TableScan<'a> {
conn: &'a FlussConnection,
table_info: TableInfo,
metadata: Arc<Metadata>,
+ /// Column indices to project. None means all columns, Some(vec) means
only the specified columns (non-empty).
+ projected_fields: Option<Vec<usize>>,
}
impl<'a> TableScan<'a> {
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
conn,
table_info,
metadata,
+ projected_fields: None,
}
}
- pub fn create_log_scanner(&self) -> LogScanner {
+ /// Projects the scan to only include specified columns by their indices.
+ ///
+ /// # Arguments
+ /// * `column_indices` - Zero-based indices of columns to include in the
scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_indices` is empty or if any column index
is out of range.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project(&[0, 2,
3])?.create_log_scanner();
+ /// ```
+ pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+ if column_indices.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column indices cannot be empty".to_string(),
+ ));
+ }
+ let field_count = self.table_info.row_type().fields().len();
+ for &idx in column_indices {
+ if idx >= field_count {
+ return Err(Error::IllegalArgument(format!(
+ "Column index {} out of range (max: {})",
+ idx,
+ field_count - 1
+ )));
+ }
+ }
+ self.projected_fields = Some(column_indices.to_vec());
+ Ok(self)
+ }
+
+ /// Projects the scan to only include specified columns by their names.
+ ///
+ /// # Arguments
+ /// * `column_names` - Names of columns to include in the scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_names` is empty or if any column name is
not found in the table schema.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project_by_name(&["col1",
"col3"])?.create_log_scanner();
+ /// ```
+ pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
+ if column_names.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column names cannot be empty".to_string(),
+ ));
+ }
+ let row_type = self.table_info.row_type();
+ let mut indices = Vec::new();
+
+ for name in column_names {
+ let idx = row_type
+ .fields()
+ .iter()
+ .position(|f| f.name() == *name)
+ .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}'
not found")))?;
+ indices.push(idx);
+ }
+
+ self.projected_fields = Some(indices);
+ Ok(self)
+ }
+
+ pub fn create_log_scanner(self) -> LogScanner {
LogScanner::new(
&self.table_info,
self.metadata.clone(),
self.conn.get_connections(),
+ self.projected_fields,
)
}
}
@@ -72,6 +143,7 @@ impl LogScanner {
table_info: &TableInfo,
metadata: Arc<Metadata>,
connections: Arc<RpcClient>,
+ projected_fields: Option<Vec<usize>>,
) -> Self {
let log_scanner_status = Arc::new(LogScannerStatus::new());
Self {
@@ -84,6 +156,7 @@ impl LogScanner {
connections.clone(),
metadata.clone(),
log_scanner_status.clone(),
+ projected_fields,
),
}
}
@@ -114,6 +187,7 @@ struct LogFetcher {
table_info: TableInfo,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
+ read_context: ReadContext,
}
impl LogFetcher {
@@ -122,13 +196,27 @@ impl LogFetcher {
conns: Arc<RpcClient>,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
+ projected_fields: Option<Vec<usize>>,
) -> Self {
+ let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
+ let read_context = Self::create_read_context(full_arrow_schema,
projected_fields);
LogFetcher {
table_path: table_info.table_path.clone(),
- conns: conns.clone(),
- table_info: table_info.clone(),
- metadata: metadata.clone(),
- log_scanner_status: log_scanner_status.clone(),
+ conns,
+ table_info,
+ metadata,
+ log_scanner_status,
+ read_context,
+ }
+ }
+
+ fn create_read_context(
+ full_arrow_schema: SchemaRef,
+ projected_fields: Option<Vec<usize>>,
+ ) -> ReadContext {
+ match projected_fields {
+ None => ReadContext::new(full_arrow_schema),
+ Some(fields) =>
ReadContext::with_projection_pushdown(full_arrow_schema, fields),
}
}
@@ -149,7 +237,7 @@ impl LogFetcher {
for pb_fetch_log_resp in fetch_response.tables_resp {
let table_id = pb_fetch_log_resp.table_id;
let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
- let arrow_schema =
to_arrow_schema(self.table_info.get_row_type());
+
for fetch_log_for_bucket in fetch_log_for_buckets {
let mut fetch_records = vec![];
let bucket: i32 = fetch_log_for_bucket.bucket_id;
@@ -158,8 +246,7 @@ impl LogFetcher {
let data = fetch_log_for_bucket.records.unwrap();
for log_record in &mut LogRecordsBatchs::new(&data) {
let last_offset = log_record.last_log_offset();
- fetch_records
-
.extend(log_record.records(ReadContext::new(arrow_schema.clone())));
+
fetch_records.extend(log_record.records(&self.read_context)?);
self.log_scanner_status
.update_offset(&table_bucket, last_offset + 1);
}
@@ -209,13 +296,19 @@ impl LogFetcher {
if ready_for_fetch_count == 0 {
HashMap::new()
} else {
+ let (projection_enabled, projected_fields) =
+ match self.read_context.project_fields_in_order() {
+ None => (false, vec![]),
+ Some(fields) => (true, fields.iter().map(|&i| i as
i32).collect()),
+ };
+
fetch_log_req_for_buckets
.into_iter()
.map(|(leader_id, feq_for_buckets)| {
let req_for_table = PbFetchLogReqForTable {
table_id: table_id.unwrap(),
- projection_pushdown_enabled: false,
- projected_fields: vec![],
+ projection_pushdown_enabled: projection_enabled,
+ projected_fields: projected_fields.clone(),
buckets_req: feq_for_buckets,
};
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 487f50c..29bfe41 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -27,7 +27,12 @@ use arrow::array::{
};
use arrow::{
array::RecordBatch,
- ipc::{reader::StreamReader, writer::StreamWriter},
+ buffer::Buffer,
+ ipc::{
+ reader::{StreamReader, read_record_batch},
+ root_as_message,
+ writer::StreamWriter,
+ },
};
use arrow_schema::SchemaRef;
use arrow_schema::{DataType as ArrowDataType, Field};
@@ -472,41 +477,84 @@ impl<'a> LogRecordBatch<'a> {
LittleEndian::read_i32(&self.data[offset..offset +
RECORDS_COUNT_LENGTH])
}
- pub fn records(&self, read_context: ReadContext) -> LogRecordIterator {
- let count = self.record_count();
- if count == 0 {
- return LogRecordIterator::empty();
+ pub fn records(&self, read_context: &ReadContext) ->
Result<LogRecordIterator> {
+ if self.record_count() == 0 {
+ return Ok(LogRecordIterator::empty());
}
- // get arrow_metadata
- let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap();
- // arrow_batch_data
let data = &self.data[RECORDS_OFFSET..];
- // need to combine arrow_metadata_bytes + arrow_batch_data
- let cursor = Cursor::new([&arrow_metadata_bytes, data].concat());
- let mut stream_reader = StreamReader::try_new(cursor, None).unwrap();
-
- let mut record_batch = None;
- if let Some(bath) = stream_reader.next() {
- record_batch = Some(bath.unwrap());
- }
-
- if record_batch.is_none() {
- return LogRecordIterator::empty();
- }
-
- let arrow_reader = ArrowReader::new(Arc::new(record_batch.unwrap()));
- LogRecordIterator::Arrow(ArrowLogRecordIterator {
- reader: arrow_reader,
- base_offset: self.base_log_offset(),
- timestamp: self.commit_timestamp(),
- row_id: 0,
- change_type: ChangeType::AppendOnly,
- })
+ let record_batch = read_context.record_batch(data)?;
+ let log_record_iterator = match record_batch {
+ None => LogRecordIterator::empty(),
+ Some(record_batch) => {
+ let arrow_reader = ArrowReader::new(Arc::new(record_batch));
+ LogRecordIterator::Arrow(ArrowLogRecordIterator {
+ reader: arrow_reader,
+ base_offset: self.base_log_offset(),
+ timestamp: self.commit_timestamp(),
+ row_id: 0,
+ change_type: ChangeType::AppendOnly,
+ })
+ }
+ };
+ Ok(log_record_iterator)
}
}
+/// Parse an Arrow IPC message from a byte slice.
+///
+/// Server returns RecordBatch message (without Schema message) in the
encapsulated message format.
+/// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4
bytes][RecordBatch metadata][body]
+///
+/// This format is documented at:
+///
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
+///
+/// # Arguments
+/// * `data` - The byte slice containing the IPC message.
+///
+/// # Returns
+/// Returns `Some((batch_metadata, body_buffer, version))` on success:
+/// - `batch_metadata`: The RecordBatch metadata from the IPC message.
+/// - `body_buffer`: The buffer containing the record batch body data.
+/// - `version`: The Arrow IPC metadata version.
+///
+/// Returns `None` if the data is malformed or too short.
+fn parse_ipc_message(
+ data: &[u8],
+) -> Option<(
+ arrow::ipc::RecordBatch<'_>,
+ Buffer,
+ arrow::ipc::MetadataVersion,
+)> {
+ const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
+
+ if data.len() < 8 {
+ return None;
+ }
+
+ let continuation = LittleEndian::read_u32(&data[0..4]);
+ let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize;
+
+ if continuation != CONTINUATION_MARKER {
+ return None;
+ }
+
+ if data.len() < 8 + metadata_size {
+ return None;
+ }
+
+ let metadata_bytes = &data[8..8 + metadata_size];
+ let message = root_as_message(metadata_bytes).ok()?;
+ let batch_metadata = message.header_as_record_batch()?;
+
+ let body_start = 8 + metadata_size;
+ let body_data = &data[body_start..];
+ let body_buffer = Buffer::from(body_data);
+
+ Some((batch_metadata, body_buffer, message.version()))
+}
+
pub fn to_arrow_schema(fluss_schema: &DataType) -> SchemaRef {
match &fluss_schema {
DataType::Row(row_type) => {
@@ -554,19 +602,140 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
}
}
+#[derive(Clone)]
pub struct ReadContext {
- arrow_schema: SchemaRef,
+ target_schema: SchemaRef,
+
+ projection: Option<Projection>,
+}
+
+#[derive(Clone)]
+struct Projection {
+ ordered_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+ ordered_fields: Vec<usize>,
+
+ reordering_indexes: Vec<usize>,
+ reordering_needed: bool,
}
impl ReadContext {
pub fn new(arrow_schema: SchemaRef) -> ReadContext {
- ReadContext { arrow_schema }
+ ReadContext {
+ target_schema: arrow_schema,
+ projection: None,
+ }
}
- pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
- let mut arrow_schema_bytes = vec![];
- let _writer = StreamWriter::try_new(&mut arrow_schema_bytes,
&self.arrow_schema)?;
- Ok(arrow_schema_bytes)
+ pub fn with_projection_pushdown(
+ arrow_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+ ) -> ReadContext {
+ let target_schema = Self::project_schema(arrow_schema.clone(),
projected_fields.as_slice());
+ let mut sorted_fields = projected_fields.clone();
+ sorted_fields.sort_unstable();
+
+ let project = {
+ if !sorted_fields.eq(&projected_fields) {
+ // reordering is required
+ // Calculate reordering indexes to transform from sorted order
to user-requested order
+ let mut reordering_indexes =
Vec::with_capacity(projected_fields.len());
+ for &original_idx in &projected_fields {
+ let pos = sorted_fields
+ .binary_search(&original_idx)
+ .expect("projection index should exist in sorted
list");
+ reordering_indexes.push(pos);
+ }
+ Projection {
+ ordered_schema: Self::project_schema(
+ arrow_schema.clone(),
+ sorted_fields.as_slice(),
+ ),
+ projected_fields,
+ ordered_fields: sorted_fields,
+ reordering_indexes,
+ reordering_needed: true,
+ }
+ } else {
+ Projection {
+ ordered_schema: Self::project_schema(arrow_schema,
projected_fields.as_slice()),
+ ordered_fields: projected_fields.clone(),
+ projected_fields,
+ reordering_indexes: vec![],
+ reordering_needed: false,
+ }
+ }
+ };
+
+ ReadContext {
+ target_schema,
+ projection: Some(project),
+ }
+ }
+
+ pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) ->
SchemaRef {
+ // todo: handle the exception
+ SchemaRef::new(
+ schema
+ .project(projected_fields)
+ .expect("can't project schema"),
+ )
+ }
+
+ pub fn project_fields(&self) -> Option<&[usize]> {
+ self.projection
+ .as_ref()
+ .map(|p| p.projected_fields.as_slice())
+ }
+
+ pub fn project_fields_in_order(&self) -> Option<&[usize]> {
+ self.projection
+ .as_ref()
+ .map(|p| p.ordered_fields.as_slice())
+ }
+
+ pub fn record_batch(&self, data: &[u8]) -> Result<Option<RecordBatch>> {
+ let (batch_metadata, body_buffer, version) = match
parse_ipc_message(data) {
+ Some(result) => result,
+ None => return Ok(None),
+ };
+
+ // the record batch from server must be ordered by field pos,
+ // according to project to decide what arrow schema to use
+ // to parse the record batch
+ let resolve_schema = match self.projection {
+ Some(ref projection) => {
+ // projection, should use ordered schema by project field pos
+ projection.ordered_schema.clone()
+ }
+ None => {
+ // no projection, use target output schema
+ self.target_schema.clone()
+ }
+ };
+
+ let record_batch = read_record_batch(
+ &body_buffer,
+ batch_metadata,
+ resolve_schema,
+ &std::collections::HashMap::new(),
+ None,
+ &version,
+ )?;
+
+ let record_batch = match &self.projection {
+ Some(projection) if projection.reordering_needed => {
+ // Reorder columns if needed (when projection pushdown with
non-sorted order)
+ let reordered_columns: Vec<_> = projection
+ .reordering_indexes
+ .iter()
+ .map(|&idx| record_batch.column(idx).clone())
+ .collect();
+ RecordBatch::try_new(self.target_schema.clone(),
reordered_columns)?
+ }
+ _ => record_batch,
+ };
+ Ok(Some(record_batch))
}
}
diff --git a/crates/fluss/tests/integration/table.rs
b/crates/fluss/tests/integration/table.rs
index aa02724..e14b852 100644
--- a/crates/fluss/tests/integration/table.rs
+++ b/crates/fluss/tests/integration/table.rs
@@ -54,6 +54,7 @@ mod table_test {
})
.join()
.expect("Failed to create cluster");
+
// wait for 20 seconds to avoid the error like
// CoordinatorEventProcessor is not initialized yet
thread::sleep(std::time::Duration::from_secs(20));
@@ -84,14 +85,16 @@ mod table_test {
}
#[tokio::test]
- async fn append_record_batch() {
+ async fn append_record_batch_and_scan() {
let cluster = get_fluss_cluster();
let connection = cluster.get_fluss_connection().await;
let admin = connection.get_admin().await.expect("Failed to get admin");
- let table_path =
- TablePath::new("fluss".to_string(),
"test_append_record_batch".to_string());
+ let table_path = TablePath::new(
+ "fluss".to_string(),
+ "test_append_record_batch_and_scan".to_string(),
+ );
let table_descriptor = TableDescriptor::builder()
.schema(
@@ -101,15 +104,18 @@ mod table_test {
.build()
.expect("Failed to build schema"),
)
+ .property("table.log.arrow.compression.type", "NONE")
.build()
.expect("Failed to build table");
create_table(&admin, &table_path, &table_descriptor).await;
- let append_writer = connection
+ let table = connection
.get_table(&table_path)
.await
- .expect("Failed to get table")
+ .expect("Failed to get table");
+
+ let append_writer = table
.new_append()
.expect("Failed to create append")
.create_writer();
@@ -128,6 +134,77 @@ mod table_test {
.await
.expect("Failed to append batch");
+ append_writer.flush().await.expect("Failed to flush");
+
+ let num_buckets = table.table_info().get_num_buckets();
+ let log_scanner = table.new_scan().create_log_scanner();
+ for bucket_id in 0..num_buckets {
+ log_scanner
+ .subscribe(bucket_id, 0)
+ .await
+ .expect("Failed to subscribe");
+ }
+
+ let scan_records = log_scanner
+ .poll(std::time::Duration::from_secs(5))
+ .await
+ .expect("Failed to poll");
+
+ let mut records: Vec<_> = scan_records.into_iter().collect();
+ records.sort_by_key(|r| r.offset());
+
+ assert_eq!(records.len(), 6, "Should have 6 records");
+ for (i, record) in records.iter().enumerate() {
+ let row = record.row();
+ let expected_c1 = (i + 1) as i32;
+ let expected_c2 = format!("a{}", i + 1);
+ assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}",
i);
+ assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index
{}", i);
+ }
+
+ let log_scanner_projected = table
+ .new_scan()
+ .project(&[1, 0])
+ .expect("Failed to project")
+ .create_log_scanner();
+ for bucket_id in 0..num_buckets {
+ log_scanner_projected
+ .subscribe(bucket_id, 0)
+ .await
+ .expect("Failed to subscribe");
+ }
+
+ let scan_records_projected = log_scanner_projected
+ .poll(std::time::Duration::from_secs(5))
+ .await
+ .expect("Failed to poll");
+
+ let mut records_projected: Vec<_> =
scan_records_projected.into_iter().collect();
+ records_projected.sort_by_key(|r| r.offset());
+
+ assert_eq!(
+ records_projected.len(),
+ 6,
+ "Should have 6 records with projection"
+ );
+ for (i, record) in records_projected.iter().enumerate() {
+ let row = record.row();
+ let expected_c2 = format!("a{}", i + 1);
+ let expected_c1 = (i + 1) as i32;
+ assert_eq!(
+ row.get_string(0),
+ expected_c2,
+ "Projected c2 (first column) mismatch at index {}",
+ i
+ );
+ assert_eq!(
+ row.get_int(1),
+ expected_c1,
+ "Projected c1 (second column) mismatch at index {}",
+ i
+ );
+ }
+
// Create scanner to verify appended records
let table = connection
.get_table(&table_path)