gabotechs commented on code in PR #22777:
URL: https://github.com/apache/datafusion/pull/22777#discussion_r3372997220


##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -4397,22 +4428,142 @@ impl Debug for Subquery {
     }
 }
 
-/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
+/// Logical partitioning schemes.
 ///
-/// See [`Partitioning`] for more details on partitioning
+/// A scheme can describe either requested repartitioning in
+/// [`LogicalPlan::Repartition`] or a partitioning property declared by a 
source.
+/// Some schemes are only valid as metadata until planner support is added.
 ///
-/// [`Partitioning`]: 
https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
+/// For physical execution partitioning, see
+/// [`datafusion_physical_expr::Partitioning`].
+///
+/// [`datafusion_physical_expr::Partitioning`]: 
https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
 pub enum Partitioning {
     /// Allocate batches using a round-robin algorithm and the specified 
number of partitions
     RoundRobinBatch(usize),
     /// Allocate rows based on a hash of one of more expressions and the 
specified number
     /// of partitions.
     Hash(Vec<Expr>, usize),
+    /// Partition rows by ranges.
+    /// See [`RangePartitioning`] for the logical contract.
+    Range(RangePartitioning),
     /// The DISTRIBUTE BY clause is used to repartition the data based on the 
input expressions
     DistributeBy(Vec<Expr>),
 }
 
+impl Partitioning {
+    /// Return the number of partitions, if known.
+    pub fn partition_count(&self) -> Option<usize> {
+        match self {
+            Self::RoundRobinBatch(partition_count) | Self::Hash(_, 
partition_count) => {
+                Some(*partition_count)
+            }
+            Self::Range(range) => Some(range.partition_count()),
+            Self::DistributeBy(_) => None,
+        }
+    }
+}
+
+/// Logical range partitioning.
+///
+/// [`RangePartitioning`] describes an ordered logical key space with split 
points.
+///
+/// - `ordering` defines the partitioning key and ordering using logical
+///   [`SortExpr`]s.
+/// - `split_points` define the boundaries between adjacent partitions.
+///
+/// Comparisons use the lexicographic order defined by `ordering`,
+/// including `ASC`/`DESC` and null ordering. Split points must be ordered
+/// according to that ordering, and each split point must have one value per
+/// ordering expression. See [`SplitPoint`] for the shared boundary contract.
+///
+/// The expressions are resolved against the declaring plan's schema. This
+/// constructor does not validate split point value types against the resolved
+/// expression types. Like other user-specified data properties such as
+/// sortedness, if a source declares range partitioning, it is responsible for
+/// placing each row in the partition described by the split points. DataFusion
+/// will not validate this is upheld.
+///
+/// NOTE: Planning [`LogicalPlan::Repartition`] with range partitioning is not
+/// currently supported. Range-aware optimizer and execution behavior will be
+/// introduced incrementally. See
+/// <https://github.com/apache/datafusion/issues/22395>.
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
+pub struct RangePartitioning {
+    /// Ordered logical partitioning key.
+    ordering: Vec<SortExpr>,
+    /// Boundaries between adjacent partitions.
+    split_points: Vec<SplitPoint>,
+}
+
+impl RangePartitioning {
+    /// Creates logical range partitioning metadata and validates split point
+    /// shape and ordering.
+    pub fn try_new(
+        ordering: Vec<SortExpr>,
+        split_points: Vec<SplitPoint>,
+    ) -> Result<Self> {
+        if ordering.is_empty() {
+            return plan_err!("Range partitioning requires non-empty ordering");
+        }
+
+        validate_range_split_points(&split_points, 
&logical_sort_options(&ordering))?;
+
+        Ok(Self {
+            ordering,
+            split_points,
+        })
+    }
+
+    /// Return the number of partitions.
+    pub fn partition_count(&self) -> usize {
+        self.split_points.len() + 1
+    }
+
+    /// Returns the ordering that defines the range key.
+    pub fn ordering(&self) -> &[SortExpr] {
+        &self.ordering
+    }
+
+    /// Returns the ordered split points between partitions.
+    pub fn split_points(&self) -> &[SplitPoint] {
+        &self.split_points
+    }
+}
+
+fn logical_sort_options(ordering: &[SortExpr]) -> Vec<SortOptions> {
+    ordering
+        .iter()
+        .map(|sort_expr| SortOptions {
+            descending: !sort_expr.asc,
+            nulls_first: sort_expr.nulls_first,
+        })
+        .collect()
+}
+
+impl Display for RangePartitioning {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        let ordering = self
+            .ordering()
+            .iter()
+            .map(ToString::to_string)
+            .collect::<Vec<_>>()
+            .join(", ");
+        let split_points = self

Review Comment:
   Using the `.join()` method from `itertools::Itertools` should allow you to 
avoid the Vec allocation:
   
   ```suggestion
               .map(ToString::to_string)
               .join(", ");
   ```



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -1264,6 +1264,14 @@ impl DefaultPhysicalPlanner {
                             .collect::<Result<Vec<_>>>()?;
                         Partitioning::Hash(runtime_expr, *n)
                     }
+                    LogicalPartitioning::Range(_) => {
+                        // TODO: Support planning LogicalPlan::Repartition with
+                        // range partitioning.
+                        // Tracked by 
https://github.com/apache/datafusion/issues/22786
+                        return not_impl_err!(
+                            "Physical plan does not support Range 
repartitioning"
+                        );
+                    }

Review Comment:
   Don't we have everything we need already for creating this mapping?
   
   ```rust
                       LogicalPartitioning::Range(v) => {
                           let sort_exprs = create_physical_sort_exprs(
                               v.ordering(),
                               input_dfschema,
                               execution_props,
                           )?;
                           Partitioning::Range(RangePartitioning::new(
                               LexOrdering::new(sort_exprs).unwrap(),
                               v.split_points().to_vec(),
                           ))
   ```



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -3300,6 +3308,25 @@ mod tests {
         aggregate_explain(&logical_plan).await
     }
 
+    #[tokio::test]
+    async fn logical_range_repartition_is_not_supported() -> Result<()> {

Review Comment:
   It would be cool if we could make this actually pass in this PR



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -4397,22 +4428,142 @@ impl Debug for Subquery {
     }
 }
 
-/// Logical partitioning schemes supported by [`LogicalPlan::Repartition`]
+/// Logical partitioning schemes.
 ///
-/// See [`Partitioning`] for more details on partitioning
+/// A scheme can describe either requested repartitioning in
+/// [`LogicalPlan::Repartition`] or a partitioning property declared by a 
source.
+/// Some schemes are only valid as metadata until planner support is added.
 ///
-/// [`Partitioning`]: 
https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
+/// For physical execution partitioning, see
+/// [`datafusion_physical_expr::Partitioning`].
+///
+/// [`datafusion_physical_expr::Partitioning`]: 
https://docs.rs/datafusion/latest/datafusion/physical_expr/enum.Partitioning.html#
 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
 pub enum Partitioning {
     /// Allocate batches using a round-robin algorithm and the specified 
number of partitions
     RoundRobinBatch(usize),
     /// Allocate rows based on a hash of one of more expressions and the 
specified number
     /// of partitions.
     Hash(Vec<Expr>, usize),
+    /// Partition rows by ranges.
+    /// See [`RangePartitioning`] for the logical contract.
+    Range(RangePartitioning),
     /// The DISTRIBUTE BY clause is used to repartition the data based on the 
input expressions
     DistributeBy(Vec<Expr>),
 }
 
+impl Partitioning {
+    /// Return the number of partitions, if known.
+    pub fn partition_count(&self) -> Option<usize> {
+        match self {
+            Self::RoundRobinBatch(partition_count) | Self::Hash(_, 
partition_count) => {
+                Some(*partition_count)
+            }
+            Self::Range(range) => Some(range.partition_count()),
+            Self::DistributeBy(_) => None,
+        }
+    }
+}
+
+/// Logical range partitioning.
+///
+/// [`RangePartitioning`] describes an ordered logical key space with split 
points.
+///
+/// - `ordering` defines the partitioning key and ordering using logical
+///   [`SortExpr`]s.
+/// - `split_points` define the boundaries between adjacent partitions.
+///
+/// Comparisons use the lexicographic order defined by `ordering`,
+/// including `ASC`/`DESC` and null ordering. Split points must be ordered
+/// according to that ordering, and each split point must have one value per
+/// ordering expression. See [`SplitPoint`] for the shared boundary contract.
+///
+/// The expressions are resolved against the declaring plan's schema. This
+/// constructor does not validate split point value types against the resolved
+/// expression types. Like other user-specified data properties such as
+/// sortedness, if a source declares range partitioning, it is responsible for
+/// placing each row in the partition described by the split points. DataFusion
+/// will not validate this is upheld.
+///
+/// NOTE: Planning [`LogicalPlan::Repartition`] with range partitioning is not
+/// currently supported. Range-aware optimizer and execution behavior will be
+/// introduced incrementally. See
+/// <https://github.com/apache/datafusion/issues/22395>.
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
+pub struct RangePartitioning {
+    /// Ordered logical partitioning key.
+    ordering: Vec<SortExpr>,
+    /// Boundaries between adjacent partitions.
+    split_points: Vec<SplitPoint>,
+}
+
+impl RangePartitioning {
+    /// Creates logical range partitioning metadata and validates split point
+    /// shape and ordering.
+    pub fn try_new(
+        ordering: Vec<SortExpr>,
+        split_points: Vec<SplitPoint>,
+    ) -> Result<Self> {
+        if ordering.is_empty() {
+            return plan_err!("Range partitioning requires non-empty ordering");
+        }
+
+        validate_range_split_points(&split_points, 
&logical_sort_options(&ordering))?;
+
+        Ok(Self {
+            ordering,
+            split_points,
+        })
+    }
+
+    /// Return the number of partitions.
+    pub fn partition_count(&self) -> usize {
+        self.split_points.len() + 1
+    }
+
+    /// Returns the ordering that defines the range key.
+    pub fn ordering(&self) -> &[SortExpr] {
+        &self.ordering
+    }
+
+    /// Returns the ordered split points between partitions.
+    pub fn split_points(&self) -> &[SplitPoint] {
+        &self.split_points
+    }
+}
+
+fn logical_sort_options(ordering: &[SortExpr]) -> Vec<SortOptions> {
+    ordering
+        .iter()
+        .map(|sort_expr| SortOptions {
+            descending: !sort_expr.asc,
+            nulls_first: sort_expr.nulls_first,
+        })
+        .collect()
+}
+
+impl Display for RangePartitioning {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        let ordering = self
+            .ordering()
+            .iter()
+            .map(ToString::to_string)
+            .collect::<Vec<_>>()
+            .join(", ");
+        let split_points = self
+            .split_points()
+            .iter()
+            .map(ToString::to_string)
+            .collect::<Vec<_>>()

Review Comment:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to