ozankabak commented on code in PR #14111: URL: https://github.com/apache/datafusion/pull/14111#discussion_r1916273932
########## datafusion/physical-plan/src/memory.rs: ########## @@ -88,14 +90,25 @@ impl DisplayAs for MemoryExec { }) .unwrap_or_default(); + let constraints = + if self.cache.equivalence_properties().constraints().is_empty() { + String::new() + } else { + format!(", {}", self.cache.equivalence_properties().constraints()) + }; Review Comment: ```suggestion let constraints = self.cache.equivalence_properties().constraints(); let constraints = if constraints.is_empty() { String::new() } else { format!(", {}", constraints) }; ``` ########## datafusion/physical-expr/src/equivalence/properties.rs: ########## @@ -548,6 +570,88 @@ impl EquivalenceProperties { true } + /// Checks if the sort requirements are satisfied by any of the table constraints (primary key or unique). + /// Returns true if any constraint fully satisfies the requirements. + fn satisfied_by_constraints( + &self, + normalized_reqs: &[PhysicalSortRequirement], + ) -> bool { + self.constraints.iter().any(|constraint| match constraint { + Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => self + .satisfied_by_constraint( + normalized_reqs, + indices, + matches!(constraint, Constraint::Unique(_)), + ), + }) + } + + /// Checks if sort requirements are satisfied by a constraint (primary key or unique). + /// Returns true if the constraint indices form a valid prefix of an existing ordering + /// that matches the requirements. For unique constraints, also verifies nullable columns. + fn satisfied_by_constraint( + &self, + normalized_reqs: &[PhysicalSortRequirement], + indices: &[usize], + check_null: bool, + ) -> bool { + // Requirements must contain indices + if indices.len() > normalized_reqs.len() { + return false; + } + + // Find orderings that contain all required indices + let prefix_candidates = self + .oeq_class + .iter() + .filter(|&ordering| { + if indices.len() > ordering.len() { + return false; + } + + // Check if all constraint indices appear in this ordering + indices.iter().all(|&idx| { Review Comment: Isn't this a quadratic time-complexity approach to a problem that seem to be solvable with linear complexity? ########## datafusion/common/src/functional_dependencies.rs: ########## @@ -60,6 +60,41 @@ impl Constraints { pub fn is_empty(&self) -> bool { self.inner.is_empty() } + + /// Projects constraints using the given projection indices. + /// Returns None if any of the constraint columns are not included in the projection. + pub fn project(&self, proj_indices: &[usize]) -> Option<Self> { + let projected = self + .inner + .iter() + .filter_map(|constraint| { + match constraint { + Constraint::PrimaryKey(indices) => { + let new_indices = + update_elements_with_matching_indices(indices, proj_indices); Review Comment: If you refactor the `update_elements_with_matching_indices` function to take two `impl Iterator`'s (you probably need to replace the looping order to do that), this function can also accept `proj_indices` as an `impl Iterator`. ########## datafusion/physical-expr/src/equivalence/properties.rs: ########## @@ -966,21 +1070,46 @@ impl EquivalenceProperties { projected_constants } - /// Projects the equivalences within according to `projection_mapping` + /// Projects constraints according to the given projection mapping. + /// + /// This function takes a projection mapping and extracts the column indices of the target columns. + /// It then projects the constraints to only include relationships between + /// columns that exist in the projected output. + /// + /// # Arguments + /// + /// * `mapping` - A reference to `ProjectionMapping` that defines how expressions are mapped + /// in the projection operation + /// + /// # Returns + /// + /// Returns a new `Constraints` object containing only the constraints + /// that are valid for the projected columns. + fn projected_constraints(&self, mapping: &ProjectionMapping) -> Option<Constraints> { + let indices = mapping + .iter() + .filter_map(|(_, target)| target.as_any().downcast_ref::<Column>()) + .map(|col| col.index()) + .collect::<Vec<_>>(); + debug_assert_eq!(mapping.map.len(), indices.len()); + self.constraints.project(&indices) Review Comment: You will not need to collect and materialize `indices` if you refactor `project` to accept an iterator. See my comment in `functional_dependencies.rs`. ########## datafusion/core/src/datasource/physical_plan/file_scan_config.rs: ########## @@ -210,30 +221,25 @@ impl FileScanConfig { self } - /// Project the schema and the statistics on the given column indices - pub fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) { + /// Project the schema, constraints, and the statistics on the given column indices + pub fn project(&self) -> Self { if self.projection.is_none() && self.table_partition_cols.is_empty() { - return ( - Arc::clone(&self.file_schema), - self.statistics.clone(), - self.output_ordering.clone(), - ); + return self.clone(); } - let proj_iter: Box<dyn Iterator<Item = usize>> = match &self.projection { - Some(proj) => Box::new(proj.iter().copied()), - None => Box::new( - 0..(self.file_schema.fields().len() + self.table_partition_cols.len()), - ), - }; + // Get projection indices once, reuse for both schema and constraints + let proj_indices = self.projection.clone().unwrap_or_else(|| { + (0..(self.file_schema.fields().len() + self.table_partition_cols.len())) + .collect::<Vec<_>>() + }); Review Comment: Looking at the usages of `proj_indices` below, it seems like we are making an unnecessary/avoidable clone here ########## datafusion/physical-expr/src/equivalence/properties.rs: ########## @@ -548,6 +570,88 @@ impl EquivalenceProperties { true } + /// Checks if the sort requirements are satisfied by any of the table constraints (primary key or unique). + /// Returns true if any constraint fully satisfies the requirements. + fn satisfied_by_constraints( + &self, + normalized_reqs: &[PhysicalSortRequirement], + ) -> bool { + self.constraints.iter().any(|constraint| match constraint { + Constraint::PrimaryKey(indices) | Constraint::Unique(indices) => self + .satisfied_by_constraint( + normalized_reqs, + indices, + matches!(constraint, Constraint::Unique(_)), + ), + }) + } + + /// Checks if sort requirements are satisfied by a constraint (primary key or unique). + /// Returns true if the constraint indices form a valid prefix of an existing ordering + /// that matches the requirements. For unique constraints, also verifies nullable columns. + fn satisfied_by_constraint( + &self, + normalized_reqs: &[PhysicalSortRequirement], + indices: &[usize], + check_null: bool, + ) -> bool { + // Requirements must contain indices + if indices.len() > normalized_reqs.len() { + return false; + } + + // Find orderings that contain all required indices + let prefix_candidates = self + .oeq_class + .iter() + .filter(|&ordering| { + if indices.len() > ordering.len() { + return false; + } + + // Check if all constraint indices appear in this ordering + indices.iter().all(|&idx| { + ordering.iter().enumerate().any(|(pos, req)| { + // Only handle Column expressions + let Some(col) = req.expr.as_any().downcast_ref::<Column>() else { + return false; + }; + + // Column index must match + if col.index() != idx { + return false; + } + + // For unique constraints, verify column is not nullable if it's first/last + if check_null && (pos == 0 || pos == ordering.len() - 1) { + return !col.nullable(&self.schema).unwrap_or(true); + } + + true + }) + }) + }) + .collect::<Vec<_>>(); + + if prefix_candidates.is_empty() { + return false; + } + + // Check if any candidate ordering matches requirements prefix + prefix_candidates.iter().any(|&ordering| { Review Comment: Although less suspicious, the above comment *may* apply here too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org