alamb commented on code in PR #22207:
URL: https://github.com/apache/datafusion/pull/22207#discussion_r3277259688


##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -277,19 +466,27 @@ impl Partitioning {
         mapping: &ProjectionMapping,
         input_eq_properties: &EquivalenceProperties,
     ) -> Self {
-        if let Partitioning::Hash(exprs, part) = self {
-            let normalized_exprs = input_eq_properties
-                .project_expressions(exprs, mapping)
-                .zip(exprs)
-                .map(|(proj_expr, expr)| {
-                    proj_expr.unwrap_or_else(|| {
-                        Arc::new(UnKnownColumn::new(&expr.to_string()))
+        match self {
+            Partitioning::Hash(exprs, part) => {
+                let normalized_exprs = input_eq_properties
+                    .project_expressions(exprs, mapping)
+                    .zip(exprs)
+                    .map(|(proj_expr, expr)| {
+                        proj_expr.unwrap_or_else(|| {
+                            Arc::new(UnKnownColumn::new(&expr.to_string()))
+                        })
                     })
-                })
-                .collect();
-            Partitioning::Hash(normalized_exprs, *part)
-        } else {
-            self.clone()
+                    .collect();
+                Partitioning::Hash(normalized_exprs, *part)
+            }
+            Partitioning::Range(range) => {
+                if let Some(projected) = range.project(mapping, 
input_eq_properties) {
+                    Partitioning::Range(projected)
+                } else {
+                    Partitioning::UnknownPartitioning(range.partition_count())
+                }
+            }
+            _ => self.clone(),

Review Comment:
   nit: I think the code would be more "future proof" if this listed out the 
other variants explicitly rather than a catch all `_`. That way if we add a new 
partitioning in the future the compiler will tell us



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -845,4 +1045,140 @@ mod tests {
 
         Ok(())
     }
+
+    fn int_split_point(values: impl IntoIterator<Item = i64>) -> SplitPoint {
+        SplitPoint::new(
+            values
+                .into_iter()
+                .map(|value| ScalarValue::Int64(Some(value)))
+                .collect(),
+        )
+    }
+
+    #[test]
+    fn test_range_partitioning_metadata() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int64, false)]));
+        let col_a: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("a", &schema)?);
+
+        let range_partitioning = RangePartitioning::new(
+            [PhysicalSortExpr::new_default(Arc::clone(&col_a))].into(),
+            vec![int_split_point([10]), int_split_point([20])],
+        );
+        assert_eq!(range_partitioning.ordering()[0].to_string(), "a@0 ASC");
+        assert_eq!(
+            range_partitioning.split_points(),
+            &[int_split_point([10]), int_split_point([20])]
+        );
+        let partitioning = Partitioning::Range(range_partitioning);
+
+        assert_eq!(partitioning.partition_count(), 3);
+        assert_eq!(
+            partitioning.to_string(),
+            "Range([a@0 ASC], [(10), (20)], 3)"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_range_partitioning_project_preserves_or_degrades() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int64, false),
+        ]));
+        let col_b: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("b", &schema)?);
+        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+        let range_partitioning = Partitioning::Range(RangePartitioning::new(
+            [PhysicalSortExpr::new(col_b, SortOptions::new(true, 
false))].into(),
+            vec![int_split_point([10])],
+        ));
+
+        let keep_b_mapping = ProjectionMapping::from_indices(&[1], &schema)?;
+        let projected = range_partitioning.project(&keep_b_mapping, 
&eq_properties);
+        assert_eq!(
+            projected.to_string(),
+            "Range([b@0 DESC NULLS LAST], [(10)], 2)"
+        );
+
+        let drop_b_mapping = ProjectionMapping::from_indices(&[0], &schema)?;
+        let projected = range_partitioning.project(&drop_b_mapping, 
&eq_properties);
+        let Partitioning::UnknownPartitioning(partition_count) = projected 
else {
+            panic!("expected UnknownPartitioning, got {projected:?}");
+        };
+        assert_eq!(partition_count, 2);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_range_partitioning_project_degrades_if_ordering_collapses() -> 
Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int64, false),
+        ]));
+        let col_a: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("a", &schema)?);
+        let col_b: Arc<dyn PhysicalExpr> =
+            Arc::new(Column::new_with_schema("b", &schema)?);
+        let target: Arc<dyn PhysicalExpr> = Arc::new(Column::new("x", 0));
+        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+        let range_partitioning = Partitioning::Range(RangePartitioning::new(
+            [
+                PhysicalSortExpr::new_default(Arc::clone(&col_a)),
+                PhysicalSortExpr::new_default(Arc::clone(&col_b)),
+            ]
+            .into(),
+            vec![int_split_point([10, 100])],
+        ));
+        let mapping = ProjectionMapping::from_iter([
+            (
+                Arc::clone(&col_a),
+                ProjectionTargets::from(vec![(Arc::clone(&target), 0)]),
+            ),
+            (
+                Arc::clone(&col_b),
+                ProjectionTargets::from(vec![(Arc::clone(&target), 0)]),
+            ),
+        ]);
+
+        let projected = range_partitioning.project(&mapping, &eq_properties);
+        let Partitioning::UnknownPartitioning(partition_count) = projected 
else {
+            panic!("expected UnknownPartitioning, got {projected:?}");
+        };
+        assert_eq!(partition_count, 2);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_multi_partition_range_does_not_satisfy_hash_distribution() -> 
Result<()> {
+        let schema = Arc::new(Schema::new(vec![

Review Comment:
   the setup for the schema and creating col_a and col_b and the quivalence 
properties and range partitioning is the same in a bunch of these cases -- 
maybe it could be moved into a helper function to reduce code repetition which 
would make it easier to verify what each test is checking)
   
   
   For example, if you had something like this
   ```rust
   struct TestFixture {
     /// schema with columns a, b
     schema: SchemaRef,
     col: Arc<dyn PhysicalExpr>,
   ...
   }
   ```
   
   You could write this test with a lot less boilerplate like
   ```rust
    #[test]
       fn test_multi_partition_range_does_not_satisfy_hash_distribution() -> 
Result<()> {
         let fixture = TestFixture::new();
         let required = Distribution::HashPartitioned(vec![fixture.col_a, 
fixture.col_b]);
           assert_eq!(
               range_partitioning.satisfaction(&required, 
&fixture.eq_properties, false),
               PartitioningSatisfaction::NotSatisfied
           );
       }
   ```



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +137,197 @@ impl Display for Partitioning {
                     .join(", ");
                 write!(f, "Hash([{phy_exprs_str}], {size})")
             }
+            Partitioning::Range(range) => write!(f, "{range}"),
             Partitioning::UnknownPartitioning(size) => {
                 write!(f, "UnknownPartitioning({size})")
             }
         }
     }
 }
 
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes an ordered key space with split points.
+///
+/// - `ordering` defines the partitioning key and ordering.
+/// - `split_points` define the boundaries between adjacent partitions.
+///
+/// Comparisons use the lexicographic order defined by `ordering`, including
+/// `ASC`/`DESC` and null ordering. Split points must be strictly ordered
+/// according to that ordering, and each split point must have one value per
+/// ordering expression.
+///
+/// `N` split points define `N + 1` partitions:
+///
+/// ```text
+/// partition 0: key < split_points[0]
+/// partition 1: split_points[0] <= key < split_points[1]
+/// ...
+/// partition N - 1: split_points[N - 2] <= key < split_points[N - 1]
+/// partition N: split_points[N - 1] <= key
+/// ```
+///
+/// Values equal to split point `i` belong to partition `i + 1`, so interior
+/// partitions are lower-inclusive and upper-exclusive.
+///
+/// For a single range key:
+///
+/// ```text
+/// ordering = [date ASC NULLS LAST]
+/// split_points = [
+///   (2022-01-01),
+///   (2023-01-01),
+/// ]
+///
+/// partition 0: date before 2022-01-01
+/// partition 1: date between 2022-01-01 (inclusive) and 2023-01-01 (exclusive)
+/// partition 2: date at/after 2023-01-01
+/// ```
+///
+/// The same model extends to compound keys.
+/// For `ordering = [time ASC, city ASC]`, split points are ordered
+/// lexicographically by `(time, city)`:
+///
+/// ```text
+/// ordering = [time ASC NULLS LAST, city ASC NULLS LAST]
+/// split_points = [
+///   (2022, Allston),
+///   (2023, Allston),
+/// ]
+///
+/// partition 0: keys before  (2022, Allston)
+/// partition 1: keys between (2022, Allston) and (2023, Allston)
+/// partition 2: keys at/after (2023, Allston)
+/// ```
+///
+/// NOTE: Optimizer and execution behavior for this partitioning is 
intentionally
+/// not implemented and will be introduced incrementally.

Review Comment:
   Nit: for this comment it would be good t add a link to the epic so people 
can find the plan / current status
   
   ```suggestion
   /// not implemented and will be introduced incrementally. See
   /// <https://github.com/apache/datafusion/issues/22395>



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +137,197 @@ impl Display for Partitioning {
                     .join(", ");
                 write!(f, "Hash([{phy_exprs_str}], {size})")
             }
+            Partitioning::Range(range) => write!(f, "{range}"),
             Partitioning::UnknownPartitioning(size) => {
                 write!(f, "UnknownPartitioning({size})")
             }
         }
     }
 }
 
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes an ordered key space with split points.
+///
+/// - `ordering` defines the partitioning key and ordering.
+/// - `split_points` define the boundaries between adjacent partitions.
+///
+/// Comparisons use the lexicographic order defined by `ordering`, including
+/// `ASC`/`DESC` and null ordering. Split points must be strictly ordered
+/// according to that ordering, and each split point must have one value per
+/// ordering expression.
+///
+/// `N` split points define `N + 1` partitions:
+///
+/// ```text
+/// partition 0: key < split_points[0]
+/// partition 1: split_points[0] <= key < split_points[1]
+/// ...
+/// partition N - 1: split_points[N - 2] <= key < split_points[N - 1]
+/// partition N: split_points[N - 1] <= key
+/// ```
+///
+/// Values equal to split point `i` belong to partition `i + 1`, so interior
+/// partitions are lower-inclusive and upper-exclusive.
+///
+/// For a single range key:
+///
+/// ```text
+/// ordering = [date ASC NULLS LAST]
+/// split_points = [
+///   (2022-01-01),
+///   (2023-01-01),
+/// ]
+///
+/// partition 0: date before 2022-01-01
+/// partition 1: date between 2022-01-01 (inclusive) and 2023-01-01 (exclusive)
+/// partition 2: date at/after 2023-01-01
+/// ```
+///
+/// The same model extends to compound keys.
+/// For `ordering = [time ASC, city ASC]`, split points are ordered
+/// lexicographically by `(time, city)`:
+///
+/// ```text
+/// ordering = [time ASC NULLS LAST, city ASC NULLS LAST]
+/// split_points = [
+///   (2022, Allston),
+///   (2023, Allston),
+/// ]
+///
+/// partition 0: keys before  (2022, Allston)
+/// partition 1: keys between (2022, Allston) and (2023, Allston)
+/// partition 2: keys at/after (2023, Allston)
+/// ```
+///
+/// NOTE: Optimizer and execution behavior for this partitioning is 
intentionally
+/// not implemented and will be introduced incrementally.
+#[derive(Debug, Clone)]
+pub struct RangePartitioning {
+    /// Ordered partitioning key.
+    ordering: LexOrdering,
+    /// Boundaries between adjacent partitions.
+    split_points: Vec<SplitPoint>,
+}
+
+/// A boundary between adjacent range partitions.
+///
+/// A split point is a tuple with one [`ScalarValue`] per sort expression in 
the
+/// parent [`RangePartitioning`] ordering.
+#[derive(Debug, Clone, PartialEq)]
+pub struct SplitPoint {
+    values: Vec<ScalarValue>,
+}
+
+impl SplitPoint {
+    /// Creates a new split point from its tuple values.
+    pub fn new(values: Vec<ScalarValue>) -> Self {
+        Self { values }
+    }
+
+    /// Returns the tuple values for this split point.
+    pub fn values(&self) -> &[ScalarValue] {
+        &self.values
+    }
+}
+
+impl Display for SplitPoint {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        let values = self
+            .values
+            .iter()
+            .map(ToString::to_string)
+            .collect::<Vec<_>>()
+            .join(", ");
+        write!(f, "({values})")
+    }
+}
+
+impl RangePartitioning {
+    /// Creates range partitioning metadata.
+    ///
+    /// The caller is responsible for satisfying the contract documented on
+    /// [`RangePartitioning`].
+    pub fn new(ordering: LexOrdering, split_points: Vec<SplitPoint>) -> Self {
+        Self {

Review Comment:
   Given there is an invariant that all the values in split_points are in the 
correct order compared to the ordering, it seems like we should at least offer 
a `RangePartitioning::try_new` that validates that invariant and document that 
`new` does not chekc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to