mingmwang commented on code in PR #3855:
URL: https://github.com/apache/arrow-datafusion/pull/3855#discussion_r998924076
##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -487,6 +574,82 @@ impl Partitioning {
RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
}
}
+
+ /// Returns true when the guarantees made by this [[Partitioning]] are
sufficient to
+ /// satisfy the partitioning scheme mandated by the `required`
[[Distribution]]
+ pub fn satisfy<F: FnOnce() -> Vec<Vec<Column>>>(
+ &self,
+ required: Distribution,
+ equal_properties: F,
+ ) -> bool {
+ match required {
+ Distribution::UnspecifiedDistribution => true,
+ Distribution::SinglePartition if self.partition_count() == 1 =>
true,
+ Distribution::HashPartitioned(required_exprs) => {
+ match self {
+ // Here we do not check the partition count for hash
partitioning and assumes the partition count
+ // and hash functions in the system are the same. In
future if we plan to support storage partition-wise joins,
+ // then we need to have the partition count and hash
functions validation.
+ Partitioning::Hash(partition_exprs, _) => {
+ let fast_match =
+ expr_list_eq_any_order(&required_exprs,
partition_exprs);
+ // If the required exprs do not match, need to
leverage the eq_properties provided by the child
+ // and normalize both exprs based on the eq_properties
+ if !fast_match {
+ let eq_properties = equal_properties();
+ if !eq_properties.is_empty() {
+ let normalized_required_exprs = required_exprs
+ .iter()
+ .map(|e| {
+
normalize_expr_with_equivalence_properties(
+ e.clone(),
+ &eq_properties,
+ )
+ })
+ .collect::<Vec<_>>();
+ let normalized_partition_exprs =
partition_exprs
+ .iter()
+ .map(|e| {
+
normalize_expr_with_equivalence_properties(
+ e.clone(),
+ &eq_properties,
+ )
+ })
+ .collect::<Vec<_>>();
+ expr_list_eq_any_order(
+ &normalized_required_exprs,
+ &normalized_partition_exprs,
+ )
+ } else {
+ fast_match
+ }
+ } else {
+ fast_match
+ }
+ }
+ _ => false,
+ }
+ }
+ _ => false,
+ }
+ }
+}
+
+impl PartialEq for Partitioning {
+ fn eq(&self, other: &Partitioning) -> bool {
+ match (self, other) {
+ (
+ Partitioning::RoundRobinBatch(count1),
+ Partitioning::RoundRobinBatch(count2),
+ ) if count1 == count2 => true,
+ (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2,
count2))
+ if expr_list_eq_any_order(exprs1, exprs2) && (count1 ==
count2) =>
Review Comment:
Yeah, this is my fault, I will fix it. This code is used by the
partition-aware union to check if the partitioning specs are the same.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]