stuhood commented on code in PR #22207:
URL: https://github.com/apache/datafusion/pull/22207#discussion_r3261280186


##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +136,225 @@ impl Display for Partitioning {
                     .join(", ");
                 write!(f, "Hash([{phy_exprs_str}], {size})")
             }
+            Partitioning::Range(range) => write!(f, "{range}"),
             Partitioning::UnknownPartitioning(size) => {
                 write!(f, "UnknownPartitioning({size})")
             }
         }
     }
 }
 
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes range bounds over one or more physical
+/// expressions. Each [`RangePartition`] represents one output partition and 
must
+/// contain exactly one [`RangeInterval`] for each partition expression.
+///
+/// The source declaring this partitioning is responsible for ensuring that, 
for
+/// every emitted row, the row belongs to exactly one partition and is emitted 
by
+/// that partition. The declared ranges do not need to cover values that the 
plan
+/// cannot emit.
+///
+/// Each lower and upper bound explicitly records whether it is inclusive.
+/// Unbounded sides are represented with `None`, bound values should be 
non-null
+/// until null routing semantics are defined.
+///
+/// For example, a scan can declare date and city range partitions as:
+///
+/// ```text
+/// exprs = [date, city]
+///
+/// partition 0:
+///   date in [2021-01-01, 2022-01-01)
+///   city in [Allston, Boston)
+///
+/// partition 1:
+///   date in [2021-01-01, 2022-01-01)
+///   city in [Boston, NYC)
+/// ```

Review Comment:
   Are these supposed to be representing compound keys or multi-dimensional 
partitioning?
   
   If they are compound keys, then I think that it would be clearer to express 
them as:
   ```
   [
     (2021-01-01, Allston),
     (2022-01-01, Boston),
     ...
   ]
   ```
   
   If this is supposed to be multi-dimensional partitioning, then I think that 
that might be unnecessary, as mentioned on the discussion thread: any 
particular join only needs to consider 1 dimension (possibly with compound 
keys).



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +136,225 @@ impl Display for Partitioning {
                     .join(", ");
                 write!(f, "Hash([{phy_exprs_str}], {size})")
             }
+            Partitioning::Range(range) => write!(f, "{range}"),
             Partitioning::UnknownPartitioning(size) => {
                 write!(f, "UnknownPartitioning({size})")
             }
         }
     }
 }
 
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes range bounds over one or more physical
+/// expressions. Each [`RangePartition`] represents one output partition and 
must
+/// contain exactly one [`RangeInterval`] for each partition expression.
+///
+/// The source declaring this partitioning is responsible for ensuring that, 
for
+/// every emitted row, the row belongs to exactly one partition and is emitted 
by
+/// that partition. The declared ranges do not need to cover values that the 
plan
+/// cannot emit.
+///
+/// Each lower and upper bound explicitly records whether it is inclusive.
+/// Unbounded sides are represented with `None`, bound values should be 
non-null
+/// until null routing semantics are defined.

Review Comment:
   It would probably be good to figure out the null semantics early... which I 
think could involve baking a `SortOptions` struct in here (or at least 
nulls-first/nulls-last).



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +136,225 @@ impl Display for Partitioning {
                     .join(", ");
                 write!(f, "Hash([{phy_exprs_str}], {size})")
             }
+            Partitioning::Range(range) => write!(f, "{range}"),
             Partitioning::UnknownPartitioning(size) => {
                 write!(f, "UnknownPartitioning({size})")
             }
         }
     }
 }
 
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes range bounds over one or more physical
+/// expressions. Each [`RangePartition`] represents one output partition and 
must
+/// contain exactly one [`RangeInterval`] for each partition expression.
+///
+/// The source declaring this partitioning is responsible for ensuring that, 
for
+/// every emitted row, the row belongs to exactly one partition and is emitted 
by
+/// that partition. The declared ranges do not need to cover values that the 
plan
+/// cannot emit.
+///
+/// Each lower and upper bound explicitly records whether it is inclusive.
+/// Unbounded sides are represented with `None`, bound values should be 
non-null
+/// until null routing semantics are defined.
+///
+/// For example, a scan can declare date and city range partitions as:
+///
+/// ```text
+/// exprs = [date, city]
+///
+/// partition 0:
+///   date in [2021-01-01, 2022-01-01)
+///   city in [Allston, Boston)
+///
+/// partition 1:
+///   date in [2021-01-01, 2022-01-01)
+///   city in [Boston, NYC)
+/// ```
+///
+/// NOTE: Optimizer and execution behavior for this partitioning is 
intentionally
+/// not implemented and will be introduced incrementally. This public API keeps
+/// the partition ranges explicit for users. Repartitioning may compile the 
same
+/// metadata into a more efficient internal router.
+#[derive(Debug, Clone)]
+pub struct RangePartitioning {
+    partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    partitions: Vec<RangePartition>,
+}
+
+impl RangePartitioning {
+    /// Creates range partitioning metadata.
+    ///
+    /// The caller is responsible for ensuring each partition has one range per
+    /// partition expression and for satisfying the contract documented on
+    /// [`RangePartitioning`].
+    pub fn new(
+        partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
+        partitions: Vec<RangePartition>,
+    ) -> Self {
+        Self {
+            partition_exprs,
+            partitions,
+        }
+    }
+
+    /// Returns the partition expressions.
+    pub fn partition_exprs(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.partition_exprs
+    }
+
+    /// Returns the declared range partitions.
+    pub fn partitions(&self) -> &[RangePartition] {
+        &self.partitions
+    }
+
+    /// Returns the number of partitions.
+    pub fn partition_count(&self) -> usize {
+        self.partitions.len()
+    }
+
+    fn project(
+        &self,
+        mapping: &ProjectionMapping,
+        input_eq_properties: &EquivalenceProperties,
+    ) -> Option<Self> {
+        let partition_exprs = input_eq_properties
+            .project_expressions(&self.partition_exprs, mapping)
+            .collect::<Option<Vec<_>>>()?;
+
+        Some(Self {
+            partition_exprs,
+            partitions: self.partitions.clone(),
+        })
+    }
+}
+
+impl Display for RangePartitioning {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        let partitions = self
+            .partitions
+            .iter()
+            .map(|partition| format!("{partition}"))
+            .collect::<Vec<_>>()
+            .join(", ");
+        write!(
+            f,
+            "Range({}, [{}], {})",
+            format_physical_expr_list(&self.partition_exprs),
+            partitions,
+            self.partition_count()
+        )
+    }
+}
+
+impl PartialEq for RangePartitioning {
+    fn eq(&self, other: &Self) -> bool {
+        physical_exprs_equal(&self.partition_exprs, &other.partition_exprs)
+            && self.partitions == other.partitions
+    }
+}
+
+/// Ranges for one output partition in a [`RangePartitioning`].
+#[derive(Debug, Clone, PartialEq)]
+pub struct RangePartition {
+    ranges: Vec<RangeInterval>,
+}
+
+impl RangePartition {
+    /// Creates a partition from one range per partition expression.
+    pub fn new(ranges: Vec<RangeInterval>) -> Self {
+        Self { ranges }
+    }
+
+    /// Returns the ranges for this partition.
+    pub fn ranges(&self) -> &[RangeInterval] {
+        &self.ranges
+    }
+}
+
+impl Display for RangePartition {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        let ranges = self
+            .ranges
+            .iter()
+            .map(|range| format!("{range}"))
+            .collect::<Vec<_>>()
+            .join(", ");
+        write!(f, "({ranges})")
+    }
+}
+
+/// A scalar interval in one range partition dimension.
+#[derive(Debug, Clone, PartialEq)]
+pub struct RangeInterval {
+    lower: Option<RangeBound>,
+    upper: Option<RangeBound>,

Review Comment:
   The problem with encoding these as intervals as opposed to points (as 
suggested 
[here](https://github.com/apache/datafusion/issues/21992#issuecomment-4452796089))
 is that in order to use a more efficient re-partitioning strategy based on a 
sorted representation, you need to start by converting this representation 
_back_ into the points representation, which involves a bunch of validation 
that the ranges are not overlapping, sorted, contiguous (so that you can 
floor), etc.
   
   I don't feel strongly about it, but I think that a point-based 
representation involves a lot fewer special cases.
   
   A points representation _must_ cover the entire set of valid values (by 
construction). That doesn't let you use the partitioning strategy to short 
circuit if the ranges "Partially" cover the valid values (in the sense of being 
a partial function... e.g. `TryFrom` vs `From`). But honestly, I don't think 
that allowing for partial partitioning is a good idea anyway: for example, the 
Repartition operator wouldn't actually know what to do with a row which didn't 
map to any partition: it can't discard rows, because it doesn't know what 
operator is consuming it... so it would have to error. So I think that in 
practice, all Range partitioning strategies would need to be complete anyway, 
and this extra generality is just complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to