QuakeWang commented on code in PR #387:
URL: https://github.com/apache/paimon-rust/pull/387#discussion_r3417900003


##########
crates/paimon/Cargo.toml:
##########


Review Comment:
   nit: `kanal` appears to be unused now that the Vortex writer no longer uses 
the streaming path. Could we remove it from the `vortex` feature and the 
dependency list?



##########
crates/paimon/src/arrow/format/vortex.rs:
##########
@@ -278,11 +300,533 @@ fn as_single_row_range(ranges: &[RowRange], total_rows: 
u64) -> Option<std::ops:
     None
 }
 
+fn build_vortex_scan_fields(
+    read_fields: &[DataField],
+    predicates: Option<&FilePredicates>,
+) -> Vec<DataField> {
+    let mut fields = read_fields.to_vec();
+
+    if let Some(fp) = predicates {
+        let mut predicate_indices = Vec::new();
+        for predicate in &fp.predicates {
+            collect_predicate_field_indices(predicate, &mut predicate_indices);
+        }
+        for index in predicate_indices {
+            if let Some(field) = fp.file_fields.get(index) {
+                push_unique_scan_field(&mut fields, field);
+            }
+        }
+    }
+
+    fields
+}
+
+fn collect_predicate_field_indices(predicate: &Predicate, indices: &mut 
Vec<usize>) {
+    match predicate {
+        Predicate::Leaf { index, .. } => indices.push(*index),
+        Predicate::And(children) | Predicate::Or(children) => {
+            for child in children {
+                collect_predicate_field_indices(child, indices);
+            }
+        }
+        Predicate::Not(inner) => collect_predicate_field_indices(inner, 
indices),
+        Predicate::AlwaysTrue | Predicate::AlwaysFalse => {}
+    }
+}
+
+fn push_unique_scan_field(fields: &mut Vec<DataField>, field: &DataField) {
+    if !fields
+        .iter()
+        .any(|existing| same_data_field(existing, field))
+    {
+        fields.push(field.clone());
+    }
+}
+
+fn same_data_field(left: &DataField, right: &DataField) -> bool {
+    left.id() == right.id() || left.name() == right.name()
+}
+
+fn filter_and_project_batch(
+    batch: RecordBatch,
+    target_schema: &SchemaRef,
+    read_fields: &[DataField],
+    scan_fields: &[DataField],
+    predicates: Option<&FilePredicates>,
+) -> crate::Result<RecordBatch> {
+    let filtered = match predicates {
+        Some(fp) => filter_record_batch_by_predicates(batch, fp, scan_fields)?,
+        None => batch,
+    };
+
+    if read_fields.is_empty() {
+        return RecordBatch::try_new_with_options(
+            target_schema.clone(),
+            vec![],
+            
&arrow_array::RecordBatchOptions::new().with_row_count(Some(filtered.num_rows())),
+        )
+        .map_err(|e| Error::DataInvalid {
+            message: format!("Failed to build projected empty RecordBatch: 
{e}"),
+            source: None,
+        });
+    }
+
+    let columns = projection_indices(read_fields, scan_fields)?
+        .into_iter()
+        .map(|index| filtered.column(index).clone())
+        .collect::<Vec<_>>();
+
+    RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| 
Error::DataInvalid {
+        message: format!("Failed to project Vortex RecordBatch: {e}"),
+        source: None,
+    })
+}
+
+fn projection_indices(
+    read_fields: &[DataField],
+    scan_fields: &[DataField],
+) -> crate::Result<Vec<usize>> {
+    read_fields
+        .iter()
+        .map(|field| {
+            scan_fields
+                .iter()
+                .position(|scan_field| same_data_field(scan_field, field))
+                .ok_or_else(|| Error::DataInvalid {
+                    message: format!(
+                        "Projected Vortex field {} was not included in the 
scan",
+                        field.name()
+                    ),
+                    source: None,
+                })
+        })
+        .collect()
+}
+
+fn filter_record_batch_by_predicates(
+    batch: RecordBatch,
+    predicates: &FilePredicates,
+    scan_fields: &[DataField],
+) -> crate::Result<RecordBatch> {
+    let Some(mask) = evaluate_predicates_mask(
+        &batch,
+        &predicates.predicates,
+        &predicates.file_fields,
+        scan_fields,
+    )?
+    else {
+        return Ok(batch);
+    };
+
+    arrow_select::filter::filter_record_batch(&batch, &mask).map_err(|e| 
Error::DataInvalid {
+        message: format!("Failed to filter Vortex RecordBatch: {e}"),
+        source: Some(Box::new(e)),
+    })
+}
+
+fn evaluate_predicates_mask(
+    batch: &RecordBatch,
+    predicates: &[Predicate],
+    file_fields: &[DataField],
+    scan_fields: &[DataField],
+) -> crate::Result<Option<BooleanArray>> {
+    let mut combined = None;
+    for predicate in predicates {
+        let Some(mask) = evaluate_predicate_mask(batch, predicate, 
file_fields, scan_fields)?
+        else {
+            continue;
+        };
+        combined = Some(match combined {
+            Some(existing) => combine_filter_masks(&existing, &mask, false),
+            None => mask,
+        });
+    }
+    Ok(combined)
+}
+
+fn evaluate_predicate_mask(
+    batch: &RecordBatch,
+    predicate: &Predicate,
+    file_fields: &[DataField],
+    scan_fields: &[DataField],
+) -> crate::Result<Option<BooleanArray>> {
+    match predicate {
+        Predicate::AlwaysTrue => Ok(Some(BooleanArray::from(vec![true; 
batch.num_rows()]))),
+        Predicate::AlwaysFalse => Ok(Some(BooleanArray::from(vec![false; 
batch.num_rows()]))),
+        Predicate::And(children) => {
+            let mut combined = None;
+            for child in children {
+                let Some(mask) = evaluate_predicate_mask(batch, child, 
file_fields, scan_fields)?
+                else {
+                    continue;
+                };
+                combined = Some(match combined {
+                    Some(existing) => combine_filter_masks(&existing, &mask, 
false),
+                    None => mask,
+                });
+            }
+            Ok(combined)
+        }
+        Predicate::Or(children) => {
+            let mut combined = BooleanArray::from(vec![false; 
batch.num_rows()]);
+            for child in children {
+                let Some(mask) = evaluate_predicate_mask(batch, child, 
file_fields, scan_fields)?
+                else {
+                    return Ok(None);
+                };
+                combined = combine_filter_masks(&combined, &mask, true);
+            }
+            Ok(Some(combined))
+        }
+        Predicate::Not(inner) => {
+            let Some(mask) = evaluate_predicate_mask(batch, inner, 
file_fields, scan_fields)?
+            else {
+                return Ok(None);
+            };
+            Ok(Some(boolean_mask_from_predicate(mask.len(), |row_index| {
+                !mask.value(row_index)
+            })))
+        }
+        Predicate::Leaf {
+            index,
+            op,
+            literals,
+            ..
+        } => {
+            let Some(file_field) = file_fields.get(*index) else {
+                return Ok(None);
+            };
+            let Some(scan_index) = scan_fields
+                .iter()
+                .position(|scan_field| same_data_field(scan_field, file_field))
+            else {
+                return Ok(None);
+            };
+            evaluate_arrow_leaf_predicate(
+                batch.column(scan_index),
+                file_field.data_type(),
+                *op,
+                literals,
+            )
+        }
+    }
+}
+
+fn evaluate_arrow_leaf_predicate(
+    array: &ArrowArrayRef,
+    data_type: &DataType,
+    op: PredicateOperator,
+    literals: &[Datum],
+) -> crate::Result<Option<BooleanArray>> {
+    match op {
+        PredicateOperator::IsNull => Ok(Some(boolean_mask_from_predicate(
+            array.len(),
+            |row_index| array.is_null(row_index),
+        ))),
+        PredicateOperator::IsNotNull => Ok(Some(boolean_mask_from_predicate(
+            array.len(),
+            |row_index| array.is_valid(row_index),
+        ))),
+        PredicateOperator::In | PredicateOperator::NotIn => {
+            evaluate_set_membership_predicate(array, data_type, op, literals)
+        }
+        PredicateOperator::Eq
+        | PredicateOperator::NotEq
+        | PredicateOperator::Lt
+        | PredicateOperator::LtEq
+        | PredicateOperator::Gt
+        | PredicateOperator::GtEq => {
+            let Some(literal) = literals.first() else {
+                return Ok(None);
+            };
+            let Some(scalar) = literal_scalar_for_arrow_filter(literal, 
data_type)? else {
+                return Ok(None);
+            };
+            let mask =
+                evaluate_column_predicate(array, &scalar, op).map_err(|e| 
Error::DataInvalid {
+                    message: format!("Failed to evaluate Vortex predicate: 
{e}"),
+                    source: Some(Box::new(e)),
+                })?;
+            Ok(Some(sanitize_filter_mask(mask)))
+        }
+    }
+}
+
+fn evaluate_set_membership_predicate(
+    array: &ArrowArrayRef,
+    data_type: &DataType,
+    op: PredicateOperator,
+    literals: &[Datum],
+) -> crate::Result<Option<BooleanArray>> {
+    if literals.is_empty() {
+        return Ok(Some(match op {
+            PredicateOperator::In => BooleanArray::from(vec![false; 
array.len()]),
+            PredicateOperator::NotIn => {
+                boolean_mask_from_predicate(array.len(), |row_index| 
array.is_valid(row_index))
+            }
+            _ => unreachable!(),
+        }));
+    }
+
+    let mut combined = match op {
+        PredicateOperator::In => BooleanArray::from(vec![false; array.len()]),
+        PredicateOperator::NotIn => {
+            boolean_mask_from_predicate(array.len(), |row_index| 
array.is_valid(row_index))
+        }
+        _ => unreachable!(),
+    };
+
+    for literal in literals {
+        let Some(scalar) = literal_scalar_for_arrow_filter(literal, 
data_type)? else {
+            return Ok(None);
+        };
+        let comparison_op = match op {
+            PredicateOperator::In => PredicateOperator::Eq,
+            PredicateOperator::NotIn => PredicateOperator::NotEq,
+            _ => unreachable!(),
+        };
+        let mask = evaluate_column_predicate(array, &scalar, 
comparison_op).map_err(|e| {
+            Error::DataInvalid {
+                message: format!("Failed to evaluate Vortex set predicate: 
{e}"),
+                source: Some(Box::new(e)),
+            }
+        })?;
+        let mask = sanitize_filter_mask(mask);
+        combined = combine_filter_masks(&combined, &mask, matches!(op, 
PredicateOperator::In));
+    }
+
+    Ok(Some(combined))
+}
+
+fn evaluate_column_predicate(
+    column: &ArrowArrayRef,
+    scalar: &Scalar<ArrowArrayRef>,
+    op: PredicateOperator,
+) -> Result<BooleanArray, ArrowError> {
+    match op {
+        PredicateOperator::Eq => arrow_eq(column, scalar),
+        PredicateOperator::NotEq => arrow_neq(column, scalar),
+        PredicateOperator::Lt => arrow_lt(column, scalar),
+        PredicateOperator::LtEq => arrow_lt_eq(column, scalar),
+        PredicateOperator::Gt => arrow_gt(column, scalar),
+        PredicateOperator::GtEq => arrow_gt_eq(column, scalar),
+        PredicateOperator::IsNull
+        | PredicateOperator::IsNotNull
+        | PredicateOperator::In
+        | PredicateOperator::NotIn => Ok(BooleanArray::new_null(column.len())),
+    }
+}
+
+fn sanitize_filter_mask(mask: BooleanArray) -> BooleanArray {
+    if mask.null_count() == 0 {
+        return mask;
+    }
+
+    boolean_mask_from_predicate(mask.len(), |row_index| {
+        mask.is_valid(row_index) && mask.value(row_index)
+    })
+}
+
+fn combine_filter_masks(left: &BooleanArray, right: &BooleanArray, use_or: 
bool) -> BooleanArray {
+    debug_assert_eq!(left.len(), right.len());
+    boolean_mask_from_predicate(left.len(), |row_index| {
+        if use_or {
+            left.value(row_index) || right.value(row_index)
+        } else {
+            left.value(row_index) && right.value(row_index)
+        }
+    })
+}
+
+fn boolean_mask_from_predicate(
+    len: usize,
+    mut predicate: impl FnMut(usize) -> bool,
+) -> BooleanArray {
+    BooleanArray::from((0..len).map(&mut predicate).collect::<Vec<_>>())
+}
+
+fn literal_scalar_for_arrow_filter(
+    literal: &Datum,
+    file_data_type: &DataType,
+) -> crate::Result<Option<Scalar<ArrowArrayRef>>> {
+    let array: ArrowArrayRef = match file_data_type {
+        DataType::Boolean(_) => match literal {
+            Datum::Bool(value) => 
Arc::new(BooleanArray::new_scalar(*value).into_inner()),
+            _ => return Ok(None),
+        },
+        DataType::TinyInt(_) => {
+            match integer_literal(literal).and_then(|value| 
i8::try_from(value).ok()) {
+                Some(value) => 
Arc::new(Int8Array::new_scalar(value).into_inner()),
+                None => return Ok(None),
+            }
+        }
+        DataType::SmallInt(_) => {
+            match integer_literal(literal).and_then(|value| 
i16::try_from(value).ok()) {
+                Some(value) => 
Arc::new(Int16Array::new_scalar(value).into_inner()),
+                None => return Ok(None),
+            }
+        }
+        DataType::Int(_) => {
+            match integer_literal(literal).and_then(|value| 
i32::try_from(value).ok()) {
+                Some(value) => 
Arc::new(Int32Array::new_scalar(value).into_inner()),
+                None => return Ok(None),
+            }
+        }
+        DataType::BigInt(_) => {
+            match integer_literal(literal).and_then(|value| 
i64::try_from(value).ok()) {
+                Some(value) => 
Arc::new(Int64Array::new_scalar(value).into_inner()),
+                None => return Ok(None),
+            }
+        }
+        DataType::Float(_) => match float32_literal(literal) {
+            Some(value) => 
Arc::new(Float32Array::new_scalar(value).into_inner()),
+            None => return Ok(None),
+        },
+        DataType::Double(_) => match float64_literal(literal) {
+            Some(value) => 
Arc::new(Float64Array::new_scalar(value).into_inner()),
+            None => return Ok(None),
+        },
+        DataType::Char(_) | DataType::VarChar(_) => match literal {
+            Datum::String(value) => 
Arc::new(StringArray::new_scalar(value.as_str()).into_inner()),
+            _ => return Ok(None),
+        },
+        DataType::Binary(_) | DataType::VarBinary(_) | DataType::Blob(_) => 
match literal {
+            Datum::Bytes(value) => 
Arc::new(BinaryArray::new_scalar(value.as_slice()).into_inner()),
+            _ => return Ok(None),
+        },
+        DataType::Date(_) => match literal {
+            Datum::Date(value) => 
Arc::new(Date32Array::new_scalar(*value).into_inner()),
+            _ => return Ok(None),
+        },
+        DataType::Time(_) => match literal {
+            Datum::Time(value) => 
Arc::new(Time32MillisecondArray::new_scalar(*value).into_inner()),
+            _ => return Ok(None),
+        },
+        DataType::Timestamp(ts) => match literal {
+            Datum::Timestamp { millis, nanos } => {
+                let Some(array) = timestamp_scalar(*millis, *nanos, 
ts.precision(), None)? else {
+                    return Ok(None);
+                };
+                array
+            }
+            _ => return Ok(None),
+        },
+        DataType::LocalZonedTimestamp(ts) => match literal {
+            Datum::LocalZonedTimestamp { millis, nanos } => {
+                let Some(array) = timestamp_scalar(*millis, *nanos, 
ts.precision(), Some("UTC"))?
+                else {
+                    return Ok(None);
+                };
+                array
+            }
+            _ => return Ok(None),
+        },
+        DataType::Decimal(decimal) => match literal {
+            Datum::Decimal {
+                unscaled,
+                precision,
+                scale,
+            } if *precision <= decimal.precision() && *scale == 
decimal.scale() => {
+                let precision =
+                    u8::try_from(decimal.precision()).map_err(|_| 
Error::Unsupported {
+                        message: "Decimal precision exceeds Arrow decimal128 
range".to_string(),
+                    })?;
+                let scale =
+                    i8::try_from(decimal.scale() as i32).map_err(|_| 
Error::Unsupported {
+                        message: "Decimal scale exceeds Arrow decimal128 
range".to_string(),
+                    })?;
+                Arc::new(
+                    Decimal128Array::new_scalar(*unscaled)
+                        .into_inner()
+                        .with_precision_and_scale(precision, scale)
+                        .map_err(|e| Error::UnexpectedError {
+                            message: format!(
+                                "Failed to build decimal scalar for Vortex row 
filter: {e}"
+                            ),
+                            source: Some(Box::new(e)),
+                        })?,
+                )
+            }
+            _ => return Ok(None),
+        },
+        DataType::Array(_) | DataType::Map(_) | DataType::Multiset(_) | 
DataType::Row(_) => {
+            return Ok(None);
+        }
+    };
+
+    Ok(Some(Scalar::new(array)))
+}
+
+fn timestamp_scalar(
+    millis: i64,
+    nanos: i32,
+    precision: u32,
+    timezone: Option<&'static str>,
+) -> crate::Result<Option<ArrowArrayRef>> {
+    let array: ArrowArrayRef = match precision {
+        0..=3 => {
+            let array = 
TimestampMillisecondArray::new_scalar(millis).into_inner();
+            match timezone {
+                Some(tz) => Arc::new(array.with_timezone(tz)),
+                None => Arc::new(array),
+            }
+        }
+        4..=6 => {
+            let value = millis * 1_000 + (nanos as i64) / 1_000;
+            let array = 
TimestampMicrosecondArray::new_scalar(value).into_inner();
+            match timezone {
+                Some(tz) => Arc::new(array.with_timezone(tz)),
+                None => Arc::new(array),
+            }
+        }
+        7..=9 => {
+            let value = millis * 1_000_000 + (nanos as i64);
+            let array = 
TimestampNanosecondArray::new_scalar(value).into_inner();
+            match timezone {
+                Some(tz) => Arc::new(array.with_timezone(tz)),
+                None => Arc::new(array),
+            }
+        }
+        _ => return Ok(None),
+    };
+    Ok(Some(array))
+}
+
+fn integer_literal(literal: &Datum) -> Option<i128> {
+    match literal {
+        Datum::TinyInt(value) => Some(i128::from(*value)),
+        Datum::SmallInt(value) => Some(i128::from(*value)),
+        Datum::Int(value) => Some(i128::from(*value)),
+        Datum::Long(value) => Some(i128::from(*value)),
+        _ => None,
+    }
+}
+
+fn float32_literal(literal: &Datum) -> Option<f32> {
+    match literal {
+        Datum::Float(value) => Some(*value),
+        Datum::Double(value) => {
+            let casted = *value as f32;
+            ((casted as f64) == *value).then_some(casted)
+        }
+        _ => None,
+    }
+}
+
+fn float64_literal(literal: &Datum) -> Option<f64> {
+    match literal {
+        Datum::Float(value) => Some(f64::from(*value)),
+        Datum::Double(value) => Some(*value),
+        _ => None,
+    }
+}
+
 // ---------------------------------------------------------------------------
 // Predicate → Vortex Expression conversion
 // ---------------------------------------------------------------------------
 
 /// Convert a list of Paimon predicates (ANDed together) into a single Vortex 
filter expression.
+#[cfg(test)]

Review Comment:
   nit: These Vortex expression conversion helpers are now test-only, while the 
production read path uses Arrow-side filtering. Keeping this conversion test 
block may make future readers think Vortex filter pushdown is still active. 
Could we remove it, or rename/comment it as legacy conversion coverage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to