alamb commented on code in PR #22207:
URL: https://github.com/apache/datafusion/pull/22207#discussion_r3274516944
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +137,176 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
+ Partitioning::Range(range) => write!(f, "{range}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes an ordered key space with split points.
+///
+/// - `sort_exprs` define the partitioning key and ordering.
Review Comment:
great
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +137,176 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
+ Partitioning::Range(range) => write!(f, "{range}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes an ordered key space with split points.
+///
+/// - `sort_exprs` define the partitioning key and ordering.
+/// - `split_points` define the boundaries between adjacent partitions. Each
+/// split point is a tuple with one [`ScalarValue`] per sort expression.
+/// - The declaring source must ensure every emitted row belongs to exactly one
+/// declared partition and is emitted by that partition.
+///
+/// The sort expressions must be non-empty, and split points must be strictly
+/// ordered according to those sort expressions.
+///
+/// For a single range key:
+///
+/// ```text
+/// sort_exprs = [date ASC NULLS LAST]
+/// split_points = [
+/// (2022-01-01),
+/// (2023-01-01),
+/// ]
+///
+/// partition 0: date before 2022-01-01
+/// partition 1: date between 2022-01-01 and 2023-01-01
Review Comment:
I think we should carefully and formally define the edge conditions and how
split poins define partitons. I think the idea is that idea is that each value
is `<=` its split point
SO for example, I think that would mean that `N` split points defines `N+1`
partitions and the key range would be divided up as follows across partitions
* Partiton 0: `key < split_points[0]`
* Partiton 1: `split_points[0] <= key < split_points[1]`
* ...
* Partition N-1: `split_points[N-2] <= key < split_points[N-1]`
* Partition N: `split_points[N-1] < key`
```suggestion
/// partition 1: date between 2022-01-01 (inclusive) and 2023-01-01
(exclusive)
```
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +137,176 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
+ Partitioning::Range(range) => write!(f, "{range}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes an ordered key space with split points.
+///
+/// - `sort_exprs` define the partitioning key and ordering.
+/// - `split_points` define the boundaries between adjacent partitions. Each
+/// split point is a tuple with one [`ScalarValue`] per sort expression.
+/// - The declaring source must ensure every emitted row belongs to exactly one
+/// declared partition and is emitted by that partition.
Review Comment:
I think the new definition defines a clear partition for every row -- so it
is not possble to emit a row that does not belong to a partition
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1482,6 +1492,11 @@ impl ExecutionPlan for RepartitionExec {
if !self.maintains_input_order()[0] {
return Ok(SortOrderPushdownResult::Unsupported);
}
+ if matches!(self.partitioning(), Partitioning::Range(_)) {
Review Comment:
I recommend making using an explict match here so it is clear what is
supported nad what is not (and the guard is less likely to be removed
accidentally)
```rust
match self.partitionoing {
...
}
```
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +137,176 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
+ Partitioning::Range(range) => write!(f, "{range}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes an ordered key space with split points.
+///
+/// - `sort_exprs` define the partitioning key and ordering.
+/// - `split_points` define the boundaries between adjacent partitions. Each
+/// split point is a tuple with one [`ScalarValue`] per sort expression.
+/// - The declaring source must ensure every emitted row belongs to exactly one
+/// declared partition and is emitted by that partition.
+///
+/// The sort expressions must be non-empty, and split points must be strictly
+/// ordered according to those sort expressions.
+///
+/// For a single range key:
+///
+/// ```text
+/// sort_exprs = [date ASC NULLS LAST]
+/// split_points = [
+/// (2022-01-01),
+/// (2023-01-01),
+/// ]
+///
+/// partition 0: date before 2022-01-01
+/// partition 1: date between 2022-01-01 and 2023-01-01
+/// partition 2: date at/after 2023-01-01
+/// ```
+///
+/// The same model extends to compound keys.
+/// For `sort_exprs = [time ASC, city ASC]`, split points are ordered
+/// lexicographically by `(time, city)`:
+///
+/// ```text
+/// sort_exprs = [time ASC NULLS LAST, city ASC NULLS LAST]
+/// split_points = [
+/// (2022, Allston),
+/// (2023, Allston),
+/// ]
+///
+/// partition 0: keys before (2022, Allston)
+/// partition 1: keys between (2022, Allston) and (2023, Allston)
+/// partition 2: keys at/after (2023, Allston)
+/// ```
+///
+/// NOTE: Optimizer and execution behavior for this partitioning is
intentionally
+/// not implemented and will be introduced incrementally.
+#[derive(Debug, Clone)]
+pub struct RangePartitioning {
+ /// Ordered partitioning key. Sort options are part of the partitioning
+ /// because `ASC`/`DESC` and null ordering decide which side of a split
point
+ /// a row belongs to.
+ sort_exprs: Vec<PhysicalSortExpr>,
+ /// Boundaries between adjacent partitions. `N` split points define `N + 1`
+ /// lower-inclusive, upper-exclusive partitions. Values equal to a split
+ /// point belong to the partition after that split point.
Review Comment:
See above for a potential more formal way of specifying this. i recommend
making the docs on RangePartitioning detailed and just leave a pointer from
`split_points` to the main docs
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +137,176 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
+ Partitioning::Range(range) => write!(f, "{range}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes an ordered key space with split points.
+///
+/// - `sort_exprs` define the partitioning key and ordering.
+/// - `split_points` define the boundaries between adjacent partitions. Each
+/// split point is a tuple with one [`ScalarValue`] per sort expression.
+/// - The declaring source must ensure every emitted row belongs to exactly one
+/// declared partition and is emitted by that partition.
+///
+/// The sort expressions must be non-empty, and split points must be strictly
+/// ordered according to those sort expressions.
+///
+/// For a single range key:
+///
+/// ```text
+/// sort_exprs = [date ASC NULLS LAST]
+/// split_points = [
+/// (2022-01-01),
+/// (2023-01-01),
+/// ]
+///
+/// partition 0: date before 2022-01-01
+/// partition 1: date between 2022-01-01 and 2023-01-01
+/// partition 2: date at/after 2023-01-01
+/// ```
+///
+/// The same model extends to compound keys.
+/// For `sort_exprs = [time ASC, city ASC]`, split points are ordered
+/// lexicographically by `(time, city)`:
+///
+/// ```text
+/// sort_exprs = [time ASC NULLS LAST, city ASC NULLS LAST]
+/// split_points = [
+/// (2022, Allston),
+/// (2023, Allston),
+/// ]
+///
+/// partition 0: keys before (2022, Allston)
+/// partition 1: keys between (2022, Allston) and (2023, Allston)
+/// partition 2: keys at/after (2023, Allston)
+/// ```
+///
+/// NOTE: Optimizer and execution behavior for this partitioning is
intentionally
+/// not implemented and will be introduced incrementally.
+#[derive(Debug, Clone)]
+pub struct RangePartitioning {
+ /// Ordered partitioning key. Sort options are part of the partitioning
+ /// because `ASC`/`DESC` and null ordering decide which side of a split
point
+ /// a row belongs to.
+ sort_exprs: Vec<PhysicalSortExpr>,
+ /// Boundaries between adjacent partitions. `N` split points define `N + 1`
+ /// lower-inclusive, upper-exclusive partitions. Values equal to a split
+ /// point belong to the partition after that split point.
+ split_points: Vec<Vec<ScalarValue>>,
Review Comment:
For future API extenability I recommend wrapping this Vec in a struct
```rust
split_points: Vec<SpitPoint>,
```
And then
```rust
struct SplitPoint {
points: Vec<ScalarValue>
}
```
That woudl both give us a good place to add documentation and things like
`Display` impls, but if we ever wanted to add additional types of split points
(like maybe `inf` or expressions) we wouldn't have to make a bunch of API
changes
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +137,176 @@ impl Display for Partitioning {
.join(", ");
write!(f, "Hash([{phy_exprs_str}], {size})")
}
+ Partitioning::Range(range) => write!(f, "{range}"),
Partitioning::UnknownPartitioning(size) => {
write!(f, "UnknownPartitioning({size})")
}
}
}
}
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes an ordered key space with split points.
+///
+/// - `sort_exprs` define the partitioning key and ordering.
+/// - `split_points` define the boundaries between adjacent partitions. Each
+/// split point is a tuple with one [`ScalarValue`] per sort expression.
+/// - The declaring source must ensure every emitted row belongs to exactly one
+/// declared partition and is emitted by that partition.
+///
+/// The sort expressions must be non-empty, and split points must be strictly
+/// ordered according to those sort expressions.
+///
+/// For a single range key:
+///
+/// ```text
+/// sort_exprs = [date ASC NULLS LAST]
+/// split_points = [
+/// (2022-01-01),
+/// (2023-01-01),
+/// ]
+///
+/// partition 0: date before 2022-01-01
+/// partition 1: date between 2022-01-01 and 2023-01-01
+/// partition 2: date at/after 2023-01-01
+/// ```
+///
+/// The same model extends to compound keys.
+/// For `sort_exprs = [time ASC, city ASC]`, split points are ordered
+/// lexicographically by `(time, city)`:
+///
+/// ```text
+/// sort_exprs = [time ASC NULLS LAST, city ASC NULLS LAST]
+/// split_points = [
+/// (2022, Allston),
+/// (2023, Allston),
+/// ]
+///
+/// partition 0: keys before (2022, Allston)
+/// partition 1: keys between (2022, Allston) and (2023, Allston)
+/// partition 2: keys at/after (2023, Allston)
+/// ```
+///
+/// NOTE: Optimizer and execution behavior for this partitioning is
intentionally
+/// not implemented and will be introduced incrementally.
+#[derive(Debug, Clone)]
+pub struct RangePartitioning {
+ /// Ordered partitioning key. Sort options are part of the partitioning
+ /// because `ASC`/`DESC` and null ordering decide which side of a split
point
+ /// a row belongs to.
+ sort_exprs: Vec<PhysicalSortExpr>,
Review Comment:
Instead of sort_exprs, what do you think about using pre-existing
LexOrdering:
https://docs.rs/datafusion/latest/datafusion/physical_expr/struct.LexOrdering.html
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]