alamb commented on code in PR #22207:
URL: https://github.com/apache/datafusion/pull/22207#discussion_r3274121399


##########
datafusion/ffi/src/physical_expr/partitioning.rs:
##########
@@ -45,6 +45,10 @@ impl From<&Partitioning> for FFI_Partitioning {
                     .collect();
                 Self::Hash(exprs, *size)
             }
+            // FFI does not yet expose range partition metadata.

Review Comment:
   When we merge this PR, it might help to make github issues for each of these 
subtask / TODOs and add a link in the code. That way if people stumble across 
the gap in the code they can find the issue and we would have a nice list of 
tasks to do (either for other contributors of for coding agents)
   
   SOmething like
   ```rust
               // FFI does not yet expose range partition metadata.
               // See https://github.com/apache/datafusion/issues/XYZ
   ```



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +136,225 @@ impl Display for Partitioning {
                     .join(", ");
                 write!(f, "Hash([{phy_exprs_str}], {size})")
             }
+            Partitioning::Range(range) => write!(f, "{range}"),
             Partitioning::UnknownPartitioning(size) => {
                 write!(f, "UnknownPartitioning({size})")
             }
         }
     }
 }
 
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes range bounds over one or more physical
+/// expressions. Each [`RangePartition`] represents one output partition and 
must
+/// contain exactly one [`RangeInterval`] for each partition expression.
+///
+/// The source declaring this partitioning is responsible for ensuring that, 
for
+/// every emitted row, the row belongs to exactly one partition and is emitted 
by
+/// that partition. The declared ranges do not need to cover values that the 
plan
+/// cannot emit.
+///
+/// Each lower and upper bound explicitly records whether it is inclusive.
+/// Unbounded sides are represented with `None`, bound values should be 
non-null
+/// until null routing semantics are defined.
+///
+/// For example, a scan can declare date and city range partitions as:
+///
+/// ```text
+/// exprs = [date, city]
+///
+/// partition 0:
+///   date in [2021-01-01, 2022-01-01)
+///   city in [Allston, Boston)
+///
+/// partition 1:
+///   date in [2021-01-01, 2022-01-01)
+///   city in [Boston, NYC)
+/// ```

Review Comment:
   > I foudn that they store physical partitioning metadata, but did not find 
anything like a “multi-dimensional repartition this row.”
   
   I agree -- Influx's model is best modeled as  "compound key" (it is not 
multi-dimensional partitioning)
   
   
   > So I think compound-key range partitioning is the right move. If there is 
a use for this I would say that this should be its own separate implementation.
   
   I agree
   



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +136,225 @@ impl Display for Partitioning {
                     .join(", ");
                 write!(f, "Hash([{phy_exprs_str}], {size})")
             }
+            Partitioning::Range(range) => write!(f, "{range}"),
             Partitioning::UnknownPartitioning(size) => {
                 write!(f, "UnknownPartitioning({size})")
             }
         }
     }
 }
 
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes range bounds over one or more physical
+/// expressions. Each [`RangePartition`] represents one output partition and 
must
+/// contain exactly one [`RangeInterval`] for each partition expression.
+///
+/// The source declaring this partitioning is responsible for ensuring that, 
for
+/// every emitted row, the row belongs to exactly one partition and is emitted 
by
+/// that partition. The declared ranges do not need to cover values that the 
plan
+/// cannot emit.
+///
+/// Each lower and upper bound explicitly records whether it is inclusive.
+/// Unbounded sides are represented with `None`, bound values should be 
non-null
+/// until null routing semantics are defined.
+///
+/// For example, a scan can declare date and city range partitions as:
+///
+/// ```text
+/// exprs = [date, city]
+///
+/// partition 0:
+///   date in [2021-01-01, 2022-01-01)
+///   city in [Allston, Boston)
+///
+/// partition 1:
+///   date in [2021-01-01, 2022-01-01)
+///   city in [Boston, NYC)
+/// ```
+///
+/// NOTE: Optimizer and execution behavior for this partitioning is 
intentionally
+/// not implemented and will be introduced incrementally. This public API keeps
+/// the partition ranges explicit for users. Repartitioning may compile the 
same
+/// metadata into a more efficient internal router.
+#[derive(Debug, Clone)]
+pub struct RangePartitioning {
+    partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    partitions: Vec<RangePartition>,
+}
+
+impl RangePartitioning {
+    /// Creates range partitioning metadata.
+    ///
+    /// The caller is responsible for ensuring each partition has one range per
+    /// partition expression and for satisfying the contract documented on
+    /// [`RangePartitioning`].
+    pub fn new(
+        partition_exprs: Vec<Arc<dyn PhysicalExpr>>,
+        partitions: Vec<RangePartition>,
+    ) -> Self {
+        Self {
+            partition_exprs,
+            partitions,
+        }
+    }
+
+    /// Returns the partition expressions.
+    pub fn partition_exprs(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.partition_exprs
+    }
+
+    /// Returns the declared range partitions.
+    pub fn partitions(&self) -> &[RangePartition] {
+        &self.partitions
+    }
+
+    /// Returns the number of partitions.
+    pub fn partition_count(&self) -> usize {
+        self.partitions.len()
+    }
+
+    fn project(
+        &self,
+        mapping: &ProjectionMapping,
+        input_eq_properties: &EquivalenceProperties,
+    ) -> Option<Self> {
+        let partition_exprs = input_eq_properties
+            .project_expressions(&self.partition_exprs, mapping)
+            .collect::<Option<Vec<_>>>()?;
+
+        Some(Self {
+            partition_exprs,
+            partitions: self.partitions.clone(),
+        })
+    }
+}
+
+impl Display for RangePartitioning {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        let partitions = self
+            .partitions
+            .iter()
+            .map(|partition| format!("{partition}"))
+            .collect::<Vec<_>>()
+            .join(", ");
+        write!(
+            f,
+            "Range({}, [{}], {})",
+            format_physical_expr_list(&self.partition_exprs),
+            partitions,
+            self.partition_count()
+        )
+    }
+}
+
+impl PartialEq for RangePartitioning {
+    fn eq(&self, other: &Self) -> bool {
+        physical_exprs_equal(&self.partition_exprs, &other.partition_exprs)
+            && self.partitions == other.partitions
+    }
+}
+
+/// Ranges for one output partition in a [`RangePartitioning`].
+#[derive(Debug, Clone, PartialEq)]
+pub struct RangePartition {
+    ranges: Vec<RangeInterval>,
+}
+
+impl RangePartition {
+    /// Creates a partition from one range per partition expression.
+    pub fn new(ranges: Vec<RangeInterval>) -> Self {
+        Self { ranges }
+    }
+
+    /// Returns the ranges for this partition.
+    pub fn ranges(&self) -> &[RangeInterval] {
+        &self.ranges
+    }
+}
+
+impl Display for RangePartition {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        let ranges = self
+            .ranges
+            .iter()
+            .map(|range| format!("{range}"))
+            .collect::<Vec<_>>()
+            .join(", ");
+        write!(f, "({ranges})")
+    }
+}
+
+/// A scalar interval in one range partition dimension.
+#[derive(Debug, Clone, PartialEq)]
+pub struct RangeInterval {
+    lower: Option<RangeBound>,
+    upper: Option<RangeBound>,

Review Comment:
   > A points representation must cover the entire set of valid values (by 
construction). 
   
   I also prefer a split points representation for the same reason. 
Specifically, I think split points ensures that any particular row value is in 
EXACTLY one partition. We would prevent user errors that could lead to cases 
where there are rows that don't belong in any partition or in more than one 
partition.
   
   This also would make the sorting semantics easier
   
   
   
   



##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -133,13 +136,225 @@ impl Display for Partitioning {
                     .join(", ");
                 write!(f, "Hash([{phy_exprs_str}], {size})")
             }
+            Partitioning::Range(range) => write!(f, "{range}"),
             Partitioning::UnknownPartitioning(size) => {
                 write!(f, "UnknownPartitioning({size})")
             }
         }
     }
 }
 
+/// Physical range partitioning.
+///
+/// [`RangePartitioning`] describes range bounds over one or more physical
+/// expressions. Each [`RangePartition`] represents one output partition and 
must
+/// contain exactly one [`RangeInterval`] for each partition expression.
+///
+/// The source declaring this partitioning is responsible for ensuring that, 
for
+/// every emitted row, the row belongs to exactly one partition and is emitted 
by

Review Comment:
   I liked @stuhood 's suggestion to make it harder to abuse this contract by 
ensuring that every value is assigned to any partition -- I'll respond more in 
detail in the conversation below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to