Copilot commented on code in PR #57:
URL: https://github.com/apache/fluss-rust/pull/57#discussion_r2583929464
##########
crates/examples/src/example_table.rs:
##########
@@ -27,7 +27,7 @@ use tokio::try_join;
#[tokio::main]
pub async fn main() -> Result<()> {
let mut config = Config::parse();
- config.bootstrap_server = Some("127.0.0.1:56405".to_string());
+ config.bootstrap_server = Some("127.0.0.1:9123".to_string());
Review Comment:
Hardcoded bootstrap server changed from port 56405 to 9123. This change
appears to be unrelated to the column pruning feature and may have been
committed accidentally. If this is an intentional configuration change for
development/testing, it should be explained in the PR description or reverted
to avoid affecting other developers' local environments.
```suggestion
config.bootstrap_server = Some("127.0.0.1:56405".to_string());
```
##########
crates/examples/Cargo.toml:
##########
@@ -27,6 +27,7 @@ version = { workspace = true }
fluss = { workspace = true }
tokio = { workspace = true }
clap = { workspace = true}
+env_logger = "0.11"
Review Comment:
The `env_logger` dependency is added but never used in the examples crate.
This unused dependency should be removed unless it's needed for future examples
or debugging purposes. If it's meant for debugging, consider adding a comment
explaining its purpose or demonstrating its use in an example.
```suggestion
```
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +623,90 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
}
}
+#[derive(Clone)]
pub struct ReadContext {
arrow_schema: SchemaRef,
+ projected_fields: Option<Vec<usize>>,
+ projection_pushdown: bool,
+ projection_in_order: Option<Vec<usize>>,
}
impl ReadContext {
pub fn new(arrow_schema: SchemaRef) -> ReadContext {
- ReadContext { arrow_schema }
+ ReadContext {
+ arrow_schema,
+ projected_fields: None,
+ projection_pushdown: false,
+ projection_in_order: None,
+ }
}
- pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
- let mut arrow_schema_bytes = vec![];
- let _writer = StreamWriter::try_new(&mut arrow_schema_bytes,
&self.arrow_schema)?;
- Ok(arrow_schema_bytes)
+ pub fn with_projection(
+ arrow_schema: SchemaRef,
+ projected_fields: Option<Vec<usize>>,
+ ) -> ReadContext {
+ ReadContext {
+ arrow_schema,
+ projected_fields,
+ projection_pushdown: false,
+ projection_in_order: None,
+ }
+ }
+
+ pub fn with_projection_pushdown(
+ arrow_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+ projection_in_order: Vec<usize>,
+ ) -> ReadContext {
+ ReadContext {
+ arrow_schema,
+ projected_fields: Some(projected_fields),
+ projection_pushdown: true,
+ projection_in_order: Some(projection_in_order),
+ }
+ }
+
+ pub fn is_projection_pushdown(&self) -> bool {
Review Comment:
The `with_projection` constructor and `projected_fields()` getter method
appear to be unused in the codebase. The only constructor being used is
`with_projection_pushdown()`. If `with_projection` is intended for future use
or as a public API, it should be documented. Otherwise, consider removing it to
reduce code complexity. The `projected_fields()` getter is also unused and
could be removed.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
conn,
table_info,
metadata,
+ projected_fields: None,
}
}
- pub fn create_log_scanner(&self) -> LogScanner {
+ /// Projects the scan to only include specified columns by their indices.
+ ///
+ /// # Arguments
+ /// * `column_indices` - Zero-based indices of columns to include in the
scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_indices` is empty or if any column index
is out of range.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project(&[0, 2,
3])?.create_log_scanner();
+ /// ```
+ pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+ if column_indices.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column indices cannot be empty".to_string(),
+ ));
+ }
+ let field_count = self.table_info.row_type().fields().len();
+ for &idx in column_indices {
+ if idx >= field_count {
+ return Err(Error::IllegalArgument(format!(
+ "Column index {} out of range (max: {})",
+ idx,
+ field_count - 1
+ )));
+ }
+ }
+ self.projected_fields = Some(column_indices.to_vec());
+ Ok(self)
+ }
+
+ /// Projects the scan to only include specified columns by their names.
+ ///
+ /// # Arguments
+ /// * `column_names` - Names of columns to include in the scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_names` is empty or if any column name is
not found in the table schema.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project_by_name(&["col1",
"col3"])?.create_log_scanner();
+ /// ```
+ pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
+ if column_names.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column names cannot be empty".to_string(),
+ ));
+ }
+ let row_type = self.table_info.row_type();
+ let mut indices = Vec::new();
+
+ for name in column_names {
+ let idx = row_type
+ .fields()
Review Comment:
The new column projection methods (`project()` and `project_by_name()`) lack
test coverage. Given that this PR introduces a significant new feature for
column pruning, unit tests or integration tests should be added to verify:
- Projection with valid column indices/names
- Error handling for empty column lists
- Error handling for out-of-range indices
- Error handling for non-existent column names
- Column reordering behavior when projection order differs from schema order
Tests should be added to ensure the projection pushdown functionality works
correctly end-to-end.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -47,14 +50,82 @@ impl<'a> TableScan<'a> {
conn,
table_info,
metadata,
+ projected_fields: None,
}
}
- pub fn create_log_scanner(&self) -> LogScanner {
+ /// Projects the scan to only include specified columns by their indices.
+ ///
+ /// # Arguments
+ /// * `column_indices` - Zero-based indices of columns to include in the
scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_indices` is empty or if any column index
is out of range.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project(&[0, 2,
3])?.create_log_scanner();
+ /// ```
+ pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
+ if column_indices.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column indices cannot be empty".to_string(),
+ ));
+ }
+ let field_count = self.table_info.row_type().fields().len();
+ for &idx in column_indices {
+ if idx >= field_count {
+ return Err(Error::IllegalArgument(format!(
+ "Column index {} out of range (max: {})",
+ idx,
+ field_count - 1
+ )));
+ }
+ }
+ self.projected_fields = Some(column_indices.to_vec());
+ Ok(self)
+ }
+
+ /// Projects the scan to only include specified columns by their names.
+ ///
+ /// # Arguments
+ /// * `column_names` - Names of columns to include in the scan
+ ///
+ /// # Errors
+ /// Returns an error if `column_names` is empty or if any column name is
not found in the table schema.
+ ///
+ /// # Example
+ /// ```
+ /// let scanner = table.new_scan().project_by_name(&["col1",
"col3"])?.create_log_scanner();
+ /// ```
+ pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
+ if column_names.is_empty() {
+ return Err(Error::IllegalArgument(
+ "Column names cannot be empty".to_string(),
+ ));
+ }
+ let row_type = self.table_info.row_type();
+ let mut indices = Vec::new();
+
+ for name in column_names {
+ let idx = row_type
+ .fields()
+ .iter()
+ .position(|f| f.name() == *name)
Review Comment:
[nitpick] The builder pattern breaks when calling `project()` or
`project_by_name()` since these methods return `Result<Self>`. This creates an
inconsistency with `create_log_scanner()` which consumes `self` without
returning a `Result`. Consider one of these approaches for better API
ergonomics:
1. **Return Self and panic on invalid input** (following builder pattern
conventions):
```rust
pub fn project(mut self, column_indices: &[usize]) -> Self {
assert!(!column_indices.is_empty(), "Column indices cannot be empty");
// ... validation with asserts/panics
self
}
```
2. **Make create_log_scanner() also return Result** (consistent error
handling):
```rust
pub fn create_log_scanner(self) -> Result<LogScanner> {
// validate state here if needed
Ok(LogScanner::new(...))
}
```
Option 2 is recommended as it provides better error handling and aligns with
Rust best practices for fallible operations.
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +623,90 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
}
}
+#[derive(Clone)]
pub struct ReadContext {
arrow_schema: SchemaRef,
+ projected_fields: Option<Vec<usize>>,
+ projection_pushdown: bool,
+ projection_in_order: Option<Vec<usize>>,
}
impl ReadContext {
pub fn new(arrow_schema: SchemaRef) -> ReadContext {
- ReadContext { arrow_schema }
+ ReadContext {
+ arrow_schema,
+ projected_fields: None,
+ projection_pushdown: false,
+ projection_in_order: None,
+ }
}
- pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
- let mut arrow_schema_bytes = vec![];
- let _writer = StreamWriter::try_new(&mut arrow_schema_bytes,
&self.arrow_schema)?;
- Ok(arrow_schema_bytes)
+ pub fn with_projection(
+ arrow_schema: SchemaRef,
+ projected_fields: Option<Vec<usize>>,
+ ) -> ReadContext {
+ ReadContext {
+ arrow_schema,
+ projected_fields,
+ projection_pushdown: false,
+ projection_in_order: None,
+ }
+ }
+
+ pub fn with_projection_pushdown(
+ arrow_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+ projection_in_order: Vec<usize>,
+ ) -> ReadContext {
+ ReadContext {
+ arrow_schema,
+ projected_fields: Some(projected_fields),
+ projection_pushdown: true,
+ projection_in_order: Some(projection_in_order),
+ }
+ }
+
+ pub fn is_projection_pushdown(&self) -> bool {
+ self.projection_pushdown
+ }
+
+ pub fn projected_fields(&self) -> Option<&[usize]> {
+ self.projected_fields.as_deref()
+ }
+
+ pub fn projection_in_order(&self) -> Option<&[usize]> {
+ self.projection_in_order.as_deref()
+ }
+
+ pub fn reordering_indexes(&self) -> Option<Vec<usize>> {
+ if !self.projection_pushdown {
+ return None;
+ }
+
+ let projected_fields = match &self.projected_fields {
+ Some(fields) => fields,
+ None => return None,
+ };
+
+ let projection_in_order = match &self.projection_in_order {
+ Some(order) => order,
+ None => return None,
+ };
+
+ if projected_fields.is_empty() {
+ return None;
+ }
+
+ // Calculate reordering indexes to transform from sorted order to
user-requested order
Review Comment:
The `.expect()` call could panic if there's a mismatch between
`projected_fields` and `projection_in_order`. While this should never happen
with the current code logic (since both are derived from the same source in
`create_read_context`), this represents a potentially unrecoverable error
condition. Consider:
1. Adding debug assertions to validate invariants in
`with_projection_pushdown()`:
```rust
debug_assert_eq!(projected_fields.len(), projection_in_order.len());
debug_assert!(projected_fields.iter().all(|&f|
projection_in_order.contains(&f)));
```
2. Or, returning a `Result` from `reordering_indexes()` instead of using
`.expect()` to make error handling more explicit and recoverable.
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -472,38 +477,102 @@ impl<'a> LogRecordBatch<'a> {
LittleEndian::read_i32(&self.data[offset..offset +
RECORDS_COUNT_LENGTH])
}
- pub fn records(&self, read_context: ReadContext) -> LogRecordIterator {
- let count = self.record_count();
- if count == 0 {
- return LogRecordIterator::empty();
+ pub fn records(&self, read_context: &ReadContext) ->
Result<LogRecordIterator> {
+ if self.record_count() == 0 {
+ return Ok(LogRecordIterator::empty());
}
- // get arrow_metadata
- let arrow_metadata_bytes = read_context.to_arrow_metadata().unwrap();
- // arrow_batch_data
let data = &self.data[RECORDS_OFFSET..];
+ let (batch_metadata, body_buffer, version) = match
Self::parse_ipc_message(data) {
+ Some(result) => result,
+ None => return Ok(LogRecordIterator::empty()),
+ };
- // need to combine arrow_metadata_bytes + arrow_batch_data
- let cursor = Cursor::new([&arrow_metadata_bytes, data].concat());
- let mut stream_reader = StreamReader::try_new(cursor, None).unwrap();
+ let reordering_indexes_opt = if read_context.is_projection_pushdown() {
+ read_context.reordering_indexes()
+ } else {
+ None
+ };
- let mut record_batch = None;
- if let Some(bath) = stream_reader.next() {
- record_batch = Some(bath.unwrap());
- }
+ let schema_to_use = read_context.arrow_schema.clone();
- if record_batch.is_none() {
- return LogRecordIterator::empty();
- }
+ let record_batch = read_record_batch(
+ &body_buffer,
+ batch_metadata,
+ schema_to_use,
+ &std::collections::HashMap::new(),
+ None,
+ &version,
+ )?;
- let arrow_reader = ArrowReader::new(Arc::new(record_batch.unwrap()));
- LogRecordIterator::Arrow(ArrowLogRecordIterator {
+ // Reorder columns if needed (when projection pushdown with non-sorted
order)
+ let record_batch = if let Some(reordering_indexes) =
&reordering_indexes_opt {
+ let reordered_columns: Vec<_> = reordering_indexes
+ .iter()
+ .map(|&idx| record_batch.column(idx).clone())
+ .collect();
+ let reordered_fields: Vec<_> = reordering_indexes
+ .iter()
+ .map(|&idx| record_batch.schema().field(idx).clone())
+ .collect();
+ let reordered_schema =
Arc::new(arrow_schema::Schema::new(reordered_fields));
+ RecordBatch::try_new(reordered_schema, reordered_columns)?
+ } else {
+ record_batch
+ };
+
+ let arrow_reader = ArrowReader::new(Arc::new(record_batch));
+ Ok(LogRecordIterator::Arrow(ArrowLogRecordIterator {
reader: arrow_reader,
base_offset: self.base_log_offset(),
timestamp: self.commit_timestamp(),
row_id: 0,
change_type: ChangeType::AppendOnly,
- })
+ }))
+ }
+
+ /// Parse an Arrow IPC message from a byte slice.
+ ///
+ /// Server returns RecordBatch message (without Schema message) in the
encapsulated message format.
+ /// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4
bytes][RecordBatch metadata][body]
+ ///
+ /// This format is documented at:
+ ///
https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
+ ///
+ /// Returns the RecordBatch metadata, body buffer, and metadata version.
+ fn parse_ipc_message(
+ data: &'a [u8],
+ ) -> Option<(
+ arrow::ipc::RecordBatch<'a>,
+ Buffer,
+ arrow::ipc::MetadataVersion,
+ )> {
+ const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
+
+ if data.len() < 8 {
+ return None;
+ }
+
+ let continuation = LittleEndian::read_u32(&data[0..4]);
+ let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize;
+
+ if continuation != CONTINUATION_MARKER {
+ return None;
+ }
+
+ if data.len() < 8 + metadata_size {
+ return None;
+ }
+
+ let metadata_bytes = &data[8..8 + metadata_size];
+ let message = root_as_message(metadata_bytes).ok()?;
+ let batch_metadata = message.header_as_record_batch()?;
+
+ let body_start = 8 + metadata_size;
Review Comment:
Potential integer overflow when calculating buffer size. If `metadata_size`
is close to `usize::MAX`, the addition `8 + metadata_size` could overflow.
While this is unlikely with real data, it's better to use checked arithmetic
for robustness:
```rust
let total_size = 8_usize.checked_add(metadata_size)
.filter(|&size| size <= data.len())?;
let metadata_bytes = &data[8..total_size];
```
This prevents potential panics or undefined behavior if malformed data is
received.
```suggestion
let total_size = 8_usize.checked_add(metadata_size)?;
if data.len() < total_size {
return None;
}
let metadata_bytes = &data[8..total_size];
let message = root_as_message(metadata_bytes).ok()?;
let batch_metadata = message.header_as_record_batch()?;
let body_start = total_size;
```
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -122,13 +196,41 @@ impl LogFetcher {
conns: Arc<RpcClient>,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
+ projected_fields: Option<Vec<usize>>,
) -> Self {
+ let full_arrow_schema = to_arrow_schema(table_info.get_row_type());
+ let read_context = Self::create_read_context(full_arrow_schema,
&projected_fields);
LogFetcher {
table_path: table_info.table_path.clone(),
- conns: conns.clone(),
- table_info: table_info.clone(),
- metadata: metadata.clone(),
- log_scanner_status: log_scanner_status.clone(),
+ conns,
+ table_info,
+ metadata,
+ log_scanner_status,
+ read_context,
+ }
+ }
+
+ fn create_read_context(
+ full_arrow_schema: SchemaRef,
+ projected_fields: &Option<Vec<usize>>,
+ ) -> ReadContext {
+ match projected_fields {
+ None => ReadContext::new(full_arrow_schema),
+ Some(fields) => {
+ let mut sorted_fields = fields.clone();
+ sorted_fields.sort();
+ let projected_schema = arrow_schema::Schema::new(
+ sorted_fields
Review Comment:
Multiple `.clone()` operations on `fields` and `sorted_fields` could be
optimized. Since `projected_fields` is consumed after this point (the
`TableScan` is moved into `create_log_scanner`), you could avoid cloning by
restructuring the code:
```rust
Some(fields) => {
let mut sorted_fields = fields.clone();
sorted_fields.sort_unstable(); // unstable sort is faster when order of
equal elements doesn't matter
let projected_schema = arrow_schema::Schema::new(
sorted_fields
.iter()
.map(|&idx| full_arrow_schema.field(idx).clone())
.collect::<Vec<_>>(),
);
ReadContext::with_projection_pushdown(
Arc::new(projected_schema),
fields, // move instead of clone
sorted_fields,
)
}
```
Note: Also use `sort_unstable()` instead of `sort()` for better performance
since we only care about ordering, not stability.
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -554,19 +623,90 @@ pub fn to_arrow_type(fluss_type: &DataType) ->
ArrowDataType {
}
}
+#[derive(Clone)]
pub struct ReadContext {
arrow_schema: SchemaRef,
+ projected_fields: Option<Vec<usize>>,
+ projection_pushdown: bool,
+ projection_in_order: Option<Vec<usize>>,
}
impl ReadContext {
pub fn new(arrow_schema: SchemaRef) -> ReadContext {
- ReadContext { arrow_schema }
+ ReadContext {
+ arrow_schema,
+ projected_fields: None,
+ projection_pushdown: false,
+ projection_in_order: None,
+ }
}
- pub fn to_arrow_metadata(&self) -> Result<Vec<u8>> {
- let mut arrow_schema_bytes = vec![];
- let _writer = StreamWriter::try_new(&mut arrow_schema_bytes,
&self.arrow_schema)?;
- Ok(arrow_schema_bytes)
+ pub fn with_projection(
+ arrow_schema: SchemaRef,
+ projected_fields: Option<Vec<usize>>,
+ ) -> ReadContext {
+ ReadContext {
+ arrow_schema,
+ projected_fields,
+ projection_pushdown: false,
+ projection_in_order: None,
+ }
+ }
+
+ pub fn with_projection_pushdown(
+ arrow_schema: SchemaRef,
+ projected_fields: Vec<usize>,
+ projection_in_order: Vec<usize>,
+ ) -> ReadContext {
+ ReadContext {
+ arrow_schema,
+ projected_fields: Some(projected_fields),
+ projection_pushdown: true,
+ projection_in_order: Some(projection_in_order),
+ }
Review Comment:
[nitpick] Extra blank line between methods. This creates inconsistent
spacing in the implementation block compared to other methods which have single
line spacing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]