Copilot commented on code in PR #57:
URL: https://github.com/apache/fluss-rust/pull/57#discussion_r2585871798
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -122,13 +196,29 @@ impl LogFetcher {
conns: Arc<RpcClient>,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
+ projected_fields: Option<Vec<usize>>,
) -> Self {
+ let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
+ let read_context = Self::create_read_context(full_arrow_schema,
projected_fields);
LogFetcher {
table_path: table_info.table_path.clone(),
- conns: conns.clone(),
- table_info: table_info.clone(),
- metadata: metadata.clone(),
- log_scanner_status: log_scanner_status.clone(),
+ conns,
+ table_info,
+ metadata,
+ log_scanner_status,
+ read_context,
+ }
+ }
+
+ fn create_read_context(
+ full_arrow_schema: SchemaRef,
+ projected_fields: Option<Vec<usize>>,
+ ) -> ReadContext {
+ match projected_fields {
+ None => ReadContext::new(full_arrow_schema),
+ Some(fields) => {
+
ReadContext::with_projection_pushdown(full_arrow_schema.clone(), fields)
Review Comment:
[nitpick] The `clone()` here is unnecessary since `full_arrow_schema` is
already a `SchemaRef` (which is an `Arc<Schema>`), and cloning an Arc just
increments the reference count. Consider removing the `.clone()`:
```rust
Some(fields) => ReadContext::with_projection_pushdown(full_arrow_schema,
fields)
```
```suggestion
ReadContext::with_projection_pushdown(full_arrow_schema,
fields)
```
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +589,135 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
}
}
+#[derive(Clone)]
pub struct ReadContext {
- arrow_schema: SchemaRef,
+ target_schema: SchemaRef,
+
+ projection: Option<Projection>,
+}
+
+#[derive(Clone)]
+struct Projection {
+ ordered_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+
+ reordering_indexes: Vec<usize>,
+ reordering_needed: bool,
}
impl ReadContext {
pub fn new(arrow_schema: SchemaRef) -> ReadContext {
- ReadContext { arrow_schema }
+ ReadContext {
+ target_schema: arrow_schema,
+ projection: None,
+ }
}
- pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
- let mut arrow_schema_bytes = vec![];
- let _writer = StreamWriter::try_new(&mut arrow_schema_bytes,
&self.arrow_schema)?;
- Ok(arrow_schema_bytes)
+ pub fn with_projection_pushdown(
+ arrow_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+ ) -> ReadContext {
+ let target_schema = Self::project_schema(arrow_schema.clone(),
projected_fields.as_slice());
+ let mut sorted_fields = projected_fields.clone();
+ sorted_fields.sort_unstable();
+
+ let project = {
+ if !sorted_fields.eq(&projected_fields) {
+ // reordering is required
+ // Calculate reordering indexes to transform from sorted order
to user-requested order
+ let mut reordering_indexes =
Vec::with_capacity(projected_fields.len());
+ for &original_idx in &projected_fields {
+ let pos = sorted_fields
+ .binary_search(&original_idx)
+ .expect("projection index should exist in sorted
list");
+ reordering_indexes.push(pos);
+ }
+ Projection {
+ ordered_schema: Self::project_schema(
+ arrow_schema.clone(),
+ sorted_fields.as_slice(),
+ ),
+ projected_fields,
+ reordering_indexes,
+ reordering_needed: true,
+ }
+ } else {
+ Projection {
+ ordered_schema: Self::project_schema(arrow_schema,
projected_fields.as_slice()),
+ projected_fields,
+ reordering_indexes: vec![],
+ reordering_needed: false,
+ }
+ }
+ };
+
+ ReadContext {
+ target_schema,
+ projection: Some(project),
+ }
+ }
+
+ pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) ->
SchemaRef {
+ // todo: handle the exception
Review Comment:
[nitpick] The TODO comment "todo: handle the exception" should be either
implemented or converted to a more descriptive comment explaining what
exception handling is needed and when it will be addressed.
If schema projection can fail here (which the `expect` suggests), this
should be handled properly as mentioned in a related comment about this code
section.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
conn,
table_info,
metadata,
+ projected_fields: None,
}
}
- pub fn create_log_scanner(&self) -> LogScanner {
+ /// Projects the scan to only include specified columns by their indices.
+ ///
+ /// # Arguments
+ /// * `column_indices` - Zero-based indices of columns to include in the
scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_indices` is empty or if any column index
is out of range.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project(&[0, 2,
3])?.create_log_scanner();
+ /// ```
+ pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+ if column_indices.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column indices cannot be empty".to_string(),
+ ));
+ }
+ let field_count = self.table_info.row_type().fields().len();
+ for &idx in column_indices {
+ if idx >= field_count {
+ return Err(Error::IllegalArgument(format!(
+ "Column index {} out of range (max: {})",
+ idx,
+ field_count - 1
+ )));
+ }
+ }
+ self.projected_fields = Some(column_indices.to_vec());
+ Ok(self)
+ }
+
+ /// Projects the scan to only include specified columns by their names.
+ ///
+ /// # Arguments
+ /// * `column_names` - Names of columns to include in the scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_names` is empty or if any column name is
not found in the table schema.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project_by_name(&["col1",
"col3"])?.create_log_scanner();
+ /// ```
Review Comment:
The documentation examples in both `project` and `project_by_name` methods
are not compilable Rust doc tests - they're missing the necessary setup code
(like `table`) and will fail if run. Consider either:
1. Adding `no_run` to prevent execution: `/// # Example\n/// ```no_run`
2. Providing complete examples with proper context
Example for `project`:
```rust
/// # Example
/// ```no_run
/// # use fluss::client::FlussConnection;
/// # async fn example() -> fluss::error::Result<()> {
/// # let conn = FlussConnection::new(config).await?;
/// # let table = conn.get_table(&table_path).await?;
/// let scanner = table.new_scan().project(&[0, 2, 3])?.create_log_scanner();
/// # Ok(())
/// # }
/// ```
```
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +589,135 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
}
}
+#[derive(Clone)]
pub struct ReadContext {
- arrow_schema: SchemaRef,
+ target_schema: SchemaRef,
+
+ projection: Option<Projection>,
+}
+
+#[derive(Clone)]
+struct Projection {
+ ordered_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+
+ reordering_indexes: Vec<usize>,
+ reordering_needed: bool,
}
impl ReadContext {
pub fn new(arrow_schema: SchemaRef) -> ReadContext {
- ReadContext { arrow_schema }
+ ReadContext {
+ target_schema: arrow_schema,
+ projection: None,
+ }
}
- pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
- let mut arrow_schema_bytes = vec![];
- let _writer = StreamWriter::try_new(&mut arrow_schema_bytes,
&self.arrow_schema)?;
- Ok(arrow_schema_bytes)
+ pub fn with_projection_pushdown(
+ arrow_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+ ) -> ReadContext {
+ let target_schema = Self::project_schema(arrow_schema.clone(),
projected_fields.as_slice());
+ let mut sorted_fields = projected_fields.clone();
+ sorted_fields.sort_unstable();
+
+ let project = {
+ if !sorted_fields.eq(&projected_fields) {
+ // reordering is required
+ // Calculate reordering indexes to transform from sorted order
to user-requested order
+ let mut reordering_indexes =
Vec::with_capacity(projected_fields.len());
+ for &original_idx in &projected_fields {
+ let pos = sorted_fields
+ .binary_search(&original_idx)
+ .expect("projection index should exist in sorted
list");
+ reordering_indexes.push(pos);
+ }
+ Projection {
+ ordered_schema: Self::project_schema(
+ arrow_schema.clone(),
+ sorted_fields.as_slice(),
+ ),
+ projected_fields,
+ reordering_indexes,
+ reordering_needed: true,
+ }
+ } else {
+ Projection {
+ ordered_schema: Self::project_schema(arrow_schema,
projected_fields.as_slice()),
+ projected_fields,
+ reordering_indexes: vec![],
+ reordering_needed: false,
+ }
+ }
+ };
+
+ ReadContext {
+ target_schema,
+ projection: Some(project),
+ }
+ }
+
+ pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) ->
SchemaRef {
+ // todo: handle the exception
+ SchemaRef::new(
+ schema
+ .project(projected_fields)
+ .expect("can't project schema"),
+ )
+ }
+
+ pub fn is_projection_pushdown(&self) -> bool {
+ self.projection.is_some()
+ }
+
+ pub fn project_fields(&self) -> Option<&[usize]> {
+ self.projection
+ .as_ref()
+ .map(|p| p.projected_fields.as_slice())
+ }
+
+ pub fn record_batch(&self, data: &[u8]) -> Result<Option<RecordBatch>> {
+ let (batch_metadata, body_buffer, version) = match
parse_ipc_message(data) {
+ Some(result) => result,
+ None => return Ok(None),
+ };
+
+ // the record batch from server must be ordered by field pos
+ // according to project to decide what arrow schema to use
+ // to parse the record batch
+ let resolve_schema = match self.projection {
+ Some(ref projection) => {
+ // projection, should use ordered schema by project field pos
+ projection.ordered_schema.clone()
+ }
+ None => {
+ // no projection, use target output schema
+ self.target_schema.clone()
+ }
+ };
+
+ let record_batch = read_record_batch(
+ &body_buffer,
+ batch_metadata,
+ resolve_schema,
+ &std::collections::HashMap::new(),
+ None,
+ &version,
+ )?;
+
+ let record_batch = match &self.projection {
+ Some(projection) if projection.reordering_needed => {
+ // Reorder columns if needed (when projection pushdown with
non-sorted order
+ let reordered_columns: Vec<_> = projection
+ .reordering_indexes
+ .iter()
+ .map(|&idx| record_batch.column(idx).clone())
+ .collect();
+ RecordBatch::try_new(record_batch.schema(), reordered_columns)?
Review Comment:
When `reordering_needed` is true, the RecordBatch is recreated using the
original `record_batch.schema()` instead of `self.target_schema`. This means
the reordered batch will have the sorted schema rather than the target schema
that matches the user-requested column order. This could lead to schema
mismatches.
Consider using `self.target_schema.clone()` instead:
```rust
RecordBatch::try_new(self.target_schema.clone(), reordered_columns)?
```
```suggestion
RecordBatch::try_new(self.target_schema.clone(),
reordered_columns)?
```
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -472,39 +477,69 @@ impl<'a> LogRecordBatch<'a> {
LittleEndian::read_i32(&self.data[offset..offset +
RECORDS_COUNT_LENGTH])
}
- pub fn records(&self, read_context: ReadContext) -> LogRecordIterator {
- let count = self.record_count();
- if count == 0 {
- return LogRecordIterator::empty();
+ pub fn records(&self, read_context: &ReadContext) ->
Result<LogRecordIterator> {
+ if self.record_count() == 0 {
+ return Ok(LogRecordIterator::empty());
}
- // get arrow_metadata
- let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap();
- // arrow_batch_data
let data = &self.data[RECORDS_OFFSET..];
- // need to combine arrow_metadata_bytes + arrow_batch_data
- let cursor = Cursor::new([&arrow_metadata_bytes, data].concat());
- let mut stream_reader = StreamReader::try_new(cursor, None).unwrap();
+ let record_batch = read_context.record_batch(data)?;
+ let log_record_iterator = match record_batch {
+ None => LogRecordIterator::empty(),
+ Some(record_batch) => {
+ let arrow_reader = ArrowReader::new(Arc::new(record_batch));
+ LogRecordIterator::Arrow(ArrowLogRecordIterator {
+ reader: arrow_reader,
+ base_offset: self.base_log_offset(),
+ timestamp: self.commit_timestamp(),
+ row_id: 0,
+ change_type: ChangeType::AppendOnly,
+ })
+ }
+ };
+ Ok(log_record_iterator)
+ }
+}
- let mut record_batch = None;
- if let Some(bath) = stream_reader.next() {
- record_batch = Some(bath.unwrap());
- }
+/// Parse an Arrow IPC message from a byte slice.
+///
+/// Server returns RecordBatch message (without Schema message) in the
encapsulated message format.
+/// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4
bytes][RecordBatch metadata][body]
+///
+/// This format is documented at:
+///
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
+///
+/// Returns the RecordBatch metadata, body buffer, and metadata version.
Review Comment:
[nitpick] Missing documentation for the `parse_ipc_message` function
parameters and return value. The function has good documentation about the
format, but should also document what each parameter means and what the return
tuple contains.
Consider adding:
```rust
/// # Arguments
/// * `data` - The byte slice containing the IPC message
///
/// # Returns
/// Returns `Some((batch_metadata, body_buffer, version))` on success:
/// - `batch_metadata`: The RecordBatch metadata from the IPC message
/// - `body_buffer`: The buffer containing the record batch body data
/// - `version`: The Arrow IPC metadata version
///
/// Returns `None` if the data is malformed or too short.
```
```suggestion
/// # Arguments
/// * `data` - The byte slice containing the IPC message.
///
/// # Returns
/// Returns `Some((batch_metadata, body_buffer, version))` on success:
/// - `batch_metadata`: The RecordBatch metadata from the IPC message.
/// - `body_buffer`: The buffer containing the record batch body data.
/// - `version`: The Arrow IPC metadata version.
///
/// Returns `None` if the data is malformed or too short.
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
conn,
table_info,
metadata,
+ projected_fields: None,
}
}
- pub fn create_log_scanner(&self) -> LogScanner {
+ /// Projects the scan to only include specified columns by their indices.
+ ///
+ /// # Arguments
+ /// * `column_indices` - Zero-based indices of columns to include in the
scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_indices` is empty or if any column index
is out of range.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project(&[0, 2,
3])?.create_log_scanner();
+ /// ```
+ pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+ if column_indices.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column indices cannot be empty".to_string(),
+ ));
+ }
+ let field_count = self.table_info.row_type().fields().len();
+ for &idx in column_indices {
+ if idx >= field_count {
+ return Err(Error::IllegalArgument(format!(
+ "Column index {} out of range (max: {})",
+ idx,
+ field_count - 1
+ )));
+ }
+ }
+ self.projected_fields = Some(column_indices.to_vec());
Review Comment:
The `project` method doesn't check for duplicate column indices. If a user
calls `project(&[0, 0, 2])`, it will add the same column index twice to
`projected_fields`, which could lead to unexpected behavior in projection.
Consider adding validation to reject or de-duplicate duplicate indices.
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +589,135 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
}
}
+#[derive(Clone)]
pub struct ReadContext {
- arrow_schema: SchemaRef,
+ target_schema: SchemaRef,
+
+ projection: Option<Projection>,
+}
+
+#[derive(Clone)]
+struct Projection {
+ ordered_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+
+ reordering_indexes: Vec<usize>,
+ reordering_needed: bool,
}
impl ReadContext {
pub fn new(arrow_schema: SchemaRef) -> ReadContext {
- ReadContext { arrow_schema }
+ ReadContext {
+ target_schema: arrow_schema,
+ projection: None,
+ }
}
- pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
- let mut arrow_schema_bytes = vec![];
- let _writer = StreamWriter::try_new(&mut arrow_schema_bytes,
&self.arrow_schema)?;
- Ok(arrow_schema_bytes)
+ pub fn with_projection_pushdown(
+ arrow_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+ ) -> ReadContext {
+ let target_schema = Self::project_schema(arrow_schema.clone(),
projected_fields.as_slice());
+ let mut sorted_fields = projected_fields.clone();
+ sorted_fields.sort_unstable();
+
+ let project = {
+ if !sorted_fields.eq(&projected_fields) {
+ // reordering is required
+ // Calculate reordering indexes to transform from sorted order
to user-requested order
+ let mut reordering_indexes =
Vec::with_capacity(projected_fields.len());
+ for &original_idx in &projected_fields {
+ let pos = sorted_fields
+ .binary_search(&original_idx)
+ .expect("projection index should exist in sorted
list");
+ reordering_indexes.push(pos);
+ }
+ Projection {
+ ordered_schema: Self::project_schema(
+ arrow_schema.clone(),
+ sorted_fields.as_slice(),
+ ),
+ projected_fields,
+ reordering_indexes,
+ reordering_needed: true,
+ }
+ } else {
+ Projection {
+ ordered_schema: Self::project_schema(arrow_schema,
projected_fields.as_slice()),
+ projected_fields,
+ reordering_indexes: vec![],
+ reordering_needed: false,
+ }
+ }
+ };
+
+ ReadContext {
+ target_schema,
+ projection: Some(project),
+ }
+ }
+
+ pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) ->
SchemaRef {
+ // todo: handle the exception
+ SchemaRef::new(
+ schema
+ .project(projected_fields)
+ .expect("can't project schema"),
+ )
+ }
+
+ pub fn is_projection_pushdown(&self) -> bool {
+ self.projection.is_some()
+ }
+
+ pub fn project_fields(&self) -> Option<&[usize]> {
+ self.projection
+ .as_ref()
+ .map(|p| p.projected_fields.as_slice())
+ }
+
+ pub fn record_batch(&self, data: &[u8]) -> Result<Option<RecordBatch>> {
+ let (batch_metadata, body_buffer, version) = match
parse_ipc_message(data) {
+ Some(result) => result,
+ None => return Ok(None),
+ };
+
+ // the record batch from server must be ordered by field pos
+ // according to project to decide what arrow schema to use
+ // to parse the record batch
+ let resolve_schema = match self.projection {
+ Some(ref projection) => {
+ // projection, should use ordered schema by project field pos
+ projection.ordered_schema.clone()
+ }
+ None => {
+ // no projection, use target output schema
+ self.target_schema.clone()
+ }
+ };
+
+ let record_batch = read_record_batch(
+ &body_buffer,
+ batch_metadata,
+ resolve_schema,
+ &std::collections::HashMap::new(),
+ None,
+ &version,
+ )?;
+
+ let record_batch = match &self.projection {
+ Some(projection) if projection.reordering_needed => {
+ // Reorder columns if needed (when projection pushdown with
non-sorted order
Review Comment:
The comment says "Reorder columns if needed (when projection pushdown with
non-sorted order" but is missing a closing parenthesis. Should be:
```rust
// Reorder columns if needed (when projection pushdown with non-sorted order)
```
```suggestion
// Reorder columns if needed (when projection pushdown with
non-sorted order)
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
conn,
table_info,
metadata,
+ projected_fields: None,
}
}
- pub fn create_log_scanner(&self) -> LogScanner {
+ /// Projects the scan to only include specified columns by their indices.
+ ///
+ /// # Arguments
+ /// * `column_indices` - Zero-based indices of columns to include in the
scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_indices` is empty or if any column index
is out of range.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project(&[0, 2,
3])?.create_log_scanner();
+ /// ```
+ pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+ if column_indices.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column indices cannot be empty".to_string(),
+ ));
+ }
+ let field_count = self.table_info.row_type().fields().len();
+ for &idx in column_indices {
+ if idx >= field_count {
+ return Err(Error::IllegalArgument(format!(
+ "Column index {} out of range (max: {})",
+ idx,
+ field_count - 1
+ )));
+ }
+ }
+ self.projected_fields = Some(column_indices.to_vec());
+ Ok(self)
+ }
+
+ /// Projects the scan to only include specified columns by their names.
+ ///
+ /// # Arguments
+ /// * `column_names` - Names of columns to include in the scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_names` is empty or if any column name is
not found in the table schema.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project_by_name(&["col1",
"col3"])?.create_log_scanner();
+ /// ```
+ pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
+ if column_names.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column names cannot be empty".to_string(),
+ ));
+ }
+ let row_type = self.table_info.row_type();
+ let mut indices = Vec::new();
+
+ for name in column_names {
+ let idx = row_type
+ .fields()
+ .iter()
+ .position(|f| f.name() == *name)
+ .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}'
not found")))?;
+ indices.push(idx);
+ }
Review Comment:
The `project_by_name` method doesn't check for duplicate column names. If a
user calls `project_by_name(&["col1", "col1"])`, it will add the same column
index twice to `indices`, which could lead to unexpected behavior in
projection. Consider adding validation to reject duplicate column names or
de-duplicate them.
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +589,135 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
}
}
+#[derive(Clone)]
pub struct ReadContext {
- arrow_schema: SchemaRef,
+ target_schema: SchemaRef,
+
+ projection: Option<Projection>,
+}
+
+#[derive(Clone)]
+struct Projection {
+ ordered_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+
+ reordering_indexes: Vec<usize>,
+ reordering_needed: bool,
}
impl ReadContext {
pub fn new(arrow_schema: SchemaRef) -> ReadContext {
- ReadContext { arrow_schema }
+ ReadContext {
+ target_schema: arrow_schema,
+ projection: None,
+ }
}
- pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
- let mut arrow_schema_bytes = vec![];
- let _writer = StreamWriter::try_new(&mut arrow_schema_bytes,
&self.arrow_schema)?;
- Ok(arrow_schema_bytes)
+ pub fn with_projection_pushdown(
+ arrow_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+ ) -> ReadContext {
+ let target_schema = Self::project_schema(arrow_schema.clone(),
projected_fields.as_slice());
+ let mut sorted_fields = projected_fields.clone();
+ sorted_fields.sort_unstable();
+
+ let project = {
+ if !sorted_fields.eq(&projected_fields) {
+ // reordering is required
+ // Calculate reordering indexes to transform from sorted order
to user-requested order
+ let mut reordering_indexes =
Vec::with_capacity(projected_fields.len());
+ for &original_idx in &projected_fields {
+ let pos = sorted_fields
+ .binary_search(&original_idx)
+ .expect("projection index should exist in sorted
list");
+ reordering_indexes.push(pos);
+ }
+ Projection {
+ ordered_schema: Self::project_schema(
+ arrow_schema.clone(),
+ sorted_fields.as_slice(),
+ ),
+ projected_fields,
+ reordering_indexes,
+ reordering_needed: true,
+ }
+ } else {
+ Projection {
+ ordered_schema: Self::project_schema(arrow_schema,
projected_fields.as_slice()),
+ projected_fields,
+ reordering_indexes: vec![],
+ reordering_needed: false,
+ }
+ }
+ };
+
+ ReadContext {
+ target_schema,
+ projection: Some(project),
+ }
+ }
+
+ pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) ->
SchemaRef {
+ // todo: handle the exception
+ SchemaRef::new(
+ schema
+ .project(projected_fields)
+ .expect("can't project schema"),
Review Comment:
The `expect` call will panic if schema projection fails. Since this is
handling user-provided projection indices (which are already validated in
`TableScan::project`), the panic risk is low but could still occur due to
internal inconsistencies. Consider returning a proper error with context about
which indices failed:
```rust
SchemaRef::new(
schema
.project(projected_fields)
.map_err(|e| Error::IllegalArgument(format!("Failed to project
schema: {}", e)))?,
)
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
conn,
table_info,
metadata,
+ projected_fields: None,
}
}
- pub fn create_log_scanner(&self) -> LogScanner {
+ /// Projects the scan to only include specified columns by their indices.
+ ///
+ /// # Arguments
+ /// * `column_indices` - Zero-based indices of columns to include in the
scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_indices` is empty or if any column index
is out of range.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project(&[0, 2,
3])?.create_log_scanner();
+ /// ```
+ pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+ if column_indices.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column indices cannot be empty".to_string(),
+ ));
+ }
+ let field_count = self.table_info.row_type().fields().len();
+ for &idx in column_indices {
+ if idx >= field_count {
+ return Err(Error::IllegalArgument(format!(
+ "Column index {} out of range (max: {})",
+ idx,
+ field_count - 1
+ )));
+ }
+ }
+ self.projected_fields = Some(column_indices.to_vec());
+ Ok(self)
+ }
+
+ /// Projects the scan to only include specified columns by their names.
+ ///
+ /// # Arguments
+ /// * `column_names` - Names of columns to include in the scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_names` is empty or if any column name is
not found in the table schema.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project_by_name(&["col1",
"col3"])?.create_log_scanner();
+ /// ```
+ pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
+ if column_names.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column names cannot be empty".to_string(),
+ ));
+ }
+ let row_type = self.table_info.row_type();
+ let mut indices = Vec::new();
+
+ for name in column_names {
+ let idx = row_type
+ .fields()
+ .iter()
+ .position(|f| f.name() == *name)
+ .ok_or_else(|| Error::IllegalArgument(format!("Column '{name}'
not found")))?;
+ indices.push(idx);
+ }
+
+ self.projected_fields = Some(indices);
+ Ok(self)
+ }
Review Comment:
The new column projection functionality lacks test coverage. There are no
tests for:
- `TableScan::project()` method
- `TableScan::project_by_name()` method
- `ReadContext::with_projection_pushdown()`
- The projection reordering logic in `ReadContext::record_batch()`
- Edge cases like duplicate columns, out-of-order projections, etc.
Since this repository has integration tests (see
`crates/fluss/tests/integration/table.rs`), consider adding tests for these new
features. For example:
```rust
#[tokio::test]
async fn test_column_projection() {
// Test that projecting specific columns returns only those columns
}
#[tokio::test]
async fn test_column_projection_by_name() {
// Test projection by column names
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]