alamb commented on code in PR #7793:
URL: https://github.com/apache/arrow-datafusion/pull/7793#discussion_r1358761498
##########
datafusion/common/src/stats.rs:
##########
@@ -203,70 +204,94 @@ impl<T: fmt::Debug + Clone + PartialEq + Eq + PartialOrd>
Display for Precision<
/// Fields are optional and can be inexact because the sources
/// sometimes provide approximate estimates for performance reasons
/// and the transformations output are not always predictable.
-#[derive(Debug, Clone, Default, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Statistics {
/// The number of table rows
- pub num_rows: Option<usize>,
+ pub num_rows: Precision<usize>,
/// total bytes of the table rows
- pub total_byte_size: Option<usize>,
+ pub total_byte_size: Precision<usize>,
/// Statistics on a column level
- pub column_statistics: Option<Vec<ColumnStatistics>>,
- /// If true, any field that is `Some(..)` is the actual value in the data
provided by the operator (it is not
- /// an estimate). Any or all other fields might still be None, in which
case no information is known.
- /// if false, any field that is `Some(..)` may contain an inexact estimate
and may not be the actual value.
- pub is_exact: bool,
+ pub column_statistics: Vec<ColumnStatistics>,
}
-impl Display for Statistics {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- if self.num_rows.is_none() && self.total_byte_size.is_none() &&
!self.is_exact {
- return Ok(());
+impl Statistics {
+ /// Returns a [`Statistics`] instance for the given schema by assigning
+ /// unknown statistics to each column in the schema.
+ pub fn new_unknown(schema: &Schema) -> Self {
+ Self {
+ num_rows: Precision::Absent,
+ total_byte_size: Precision::Absent,
+ column_statistics: Statistics::unknown_column(schema),
}
+ }
- let rows = self
- .num_rows
- .map_or_else(|| "None".to_string(), |v| v.to_string());
- let bytes = self
- .total_byte_size
- .map_or_else(|| "None".to_string(), |v| v.to_string());
+ /// Returns an unbounded `ColumnStatistics` for each field in the schema.
+ pub fn unknown_column(schema: &Schema) -> Vec<ColumnStatistics> {
+ schema
+ .fields()
+ .iter()
+ .map(|_| ColumnStatistics::new_unknown())
+ .collect()
+ }
- write!(f, "rows={}, bytes={}, exact={}", rows, bytes, self.is_exact)?;
+ /// If the exactness of a [`Statistics`] instance is lost, this function
relaxes
+ /// the exactness of all information by converting them
[`Precision::Inexact`].
+ pub fn make_inexact(self) -> Self {
Review Comment:
A very minior nit is that perhaps this could be called `into_inexact` to
mirror other Rust libraries where the `into` prefix is used to signal it
consumes `self`
##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -657,8 +658,8 @@ mod tests {
assert_eq!(tt_batches, 50 /* 100/2 */);
// test metadata
- assert_eq!(exec.statistics().num_rows, None);
- assert_eq!(exec.statistics().total_byte_size, None);
+ assert_eq!(exec.statistics()?.num_rows, Precision::Absent);
Review Comment:
I really like how this PR changes the code to be more explicit
##########
datafusion/common/src/stats.rs:
##########
@@ -203,70 +204,94 @@ impl<T: fmt::Debug + Clone + PartialEq + Eq + PartialOrd>
Display for Precision<
/// Fields are optional and can be inexact because the sources
/// sometimes provide approximate estimates for performance reasons
/// and the transformations output are not always predictable.
-#[derive(Debug, Clone, Default, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Statistics {
/// The number of table rows
- pub num_rows: Option<usize>,
+ pub num_rows: Precision<usize>,
/// total bytes of the table rows
- pub total_byte_size: Option<usize>,
+ pub total_byte_size: Precision<usize>,
/// Statistics on a column level
- pub column_statistics: Option<Vec<ColumnStatistics>>,
- /// If true, any field that is `Some(..)` is the actual value in the data
provided by the operator (it is not
- /// an estimate). Any or all other fields might still be None, in which
case no information is known.
- /// if false, any field that is `Some(..)` may contain an inexact estimate
and may not be the actual value.
- pub is_exact: bool,
+ pub column_statistics: Vec<ColumnStatistics>,
Review Comment:
Could you clarify in the comments if `column_statistics` will always contain
an entry for each column or if it can be empty? I think it is the former
##########
datafusion/core/tests/custom_sources_cases/statistics.rs:
##########
@@ -46,13 +46,10 @@ struct StatisticsValidation {
impl StatisticsValidation {
fn new(stats: Statistics, schema: SchemaRef) -> Self {
- assert!(
- stats
- .column_statistics
- .as_ref()
- .map(|cols| cols.len() == schema.fields().len())
- .unwrap_or(true),
- "if defined, the column statistics vector length should be the
number of fields"
+ assert_eq!(
Review Comment:
This invariant is much nicer to reason about
##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -33,101 +36,143 @@ pub async fn get_statistics_with_limit(
limit: Option<usize>,
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];
-
- let mut null_counts = vec![0; file_schema.fields().len()];
- let mut has_statistics = false;
- let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
-
- let mut is_exact = true;
+ let mut null_counts: Option<Vec<Precision<usize>>> = None;
+ let mut max_values: Option<Vec<Precision<ScalarValue>>> = None;
+ let mut min_values: Option<Vec<Precision<ScalarValue>>> = None;
// The number of rows and the total byte size can be calculated as long as
// at least one file has them. If none of the files provide them, then they
// will be omitted from the statistics. The missing values will be counted
// as zero.
- let mut num_rows = None;
- let mut total_byte_size = None;
+ let mut num_rows: Option<Precision<usize>> = None;
Review Comment:
Is there a need to distinguish between `num_rows = None` and `num_rows =
Precision::Absent`?
In other words I wonder if the code could be simpler if this was
```suggestion
let mut num_rows = Precision::Absent;
```
Then you could rewrite
```
num_rows = Some(if let Some(some_num_rows) = num_rows {
match (file_stats.num_rows, &some_num_rows) {
(Precision::Absent, _) => some_num_rows.to_inexact(),
(lhs, Precision::Absent) => lhs.to_inexact(),
(lhs, rhs) => lhs.add(rhs),
}
} else {
file_stats.num_rows
});
```
perhaps like
```
num_rows = match (num_rows, file_stats.num_rows) {
(Precision::Absent, _) => file_stats.num_rows,
(lhs, Precision::Absent) => lhs.to_inexact(),
(lhs, rhs) => lhs.add(&rhs),
};
```
Which seems like it captures the logic more clearly
##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -51,6 +36,21 @@ use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, FileSinkExec};
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
+
+use arrow::csv::WriterBuilder;
Review Comment:
FWIW another way to make these PRs smaller (and less scary to start
reviewing :) ) would be to make a PR that just moves the `use` statements
around (which would be very easy / fast to review)
##########
datafusion/physical-expr/src/analysis.rs:
##########
@@ -64,11 +65,32 @@ impl AnalysisContext {
) -> Self {
let mut column_boundaries = vec![];
for (idx, stats) in statistics.iter().enumerate() {
- column_boundaries.push(ExprBoundaries::from_column(
Review Comment:
This previous formulation seems clearer / more encapsulated / reusable than
the change.
Could we possibly update `ExprBoundaries::from_column` to take a `&Field`
parameter and move IntervalBound construction there?
##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -165,11 +225,75 @@ pub(crate) fn get_col_stats(
None => None,
};
ColumnStatistics {
- null_count: Some(null_counts[i]),
- max_value,
- min_value,
- distinct_count: None,
+ null_count: null_counts[i].clone(),
+ max_value:
max_value.map(Precision::Exact).unwrap_or(Precision::Absent),
+ min_value:
min_value.map(Precision::Exact).unwrap_or(Precision::Absent),
+ distinct_count: Precision::Absent,
}
})
.collect()
}
+
+/// If the given value is numerically greater than the original maximum value,
+/// set the new maximum value with appropriate exactness information.
+fn set_max_if_greater(
+ max_values: &mut [Precision<ScalarValue>],
+ max_nominee: Precision<ScalarValue>,
+ index: usize,
+) {
+ match (&max_values[index], &max_nominee) {
+ (Precision::Exact(val1), Precision::Exact(val2)) => {
+ if val1 < val2 {
+ max_values[index] = max_nominee;
+ }
+ }
+ (Precision::Exact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Exact(val2)) => {
+ if val1 < val2 {
+ max_values[index] = max_nominee.to_inexact()
+ }
+ }
+ (Precision::Inexact(_), Precision::Absent)
+ | (Precision::Exact(_), Precision::Absent) => {
+ max_values[index] = max_values[index].clone().to_inexact()
+ }
+ (Precision::Absent, Precision::Exact(_))
+ | (Precision::Absent, Precision::Inexact(_)) => {
+ max_values[index] = max_nominee.to_inexact()
+ }
+ (Precision::Absent, Precision::Absent) => max_values[index] =
Precision::Absent,
+ }
+}
+
+/// If the given value is numerically lesser than the original minimum value,
+/// set the new minimum value with appropriate exactness information.
+fn set_min_if_lesser(
+ min_values: &mut [Precision<ScalarValue>],
+ min_nominee: Precision<ScalarValue>,
+ index: usize,
+) {
+ match (&min_values[index], &min_nominee) {
+ (Precision::Exact(val1), Precision::Exact(val2)) => {
+ if val1 > val2 {
+ min_values[index] = min_nominee;
+ }
+ }
+ (Precision::Exact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Exact(val2)) => {
+ if val1 > val2 {
+ min_values[index] = min_nominee.to_inexact()
+ }
+ }
+ (Precision::Inexact(_), Precision::Absent)
+ | (Precision::Exact(_), Precision::Absent) => {
+ min_values[index] = min_values[index].clone().to_inexact()
+ }
+ (Precision::Absent, Precision::Exact(_))
+ | (Precision::Absent, Precision::Inexact(_)) => {
+ min_values[index] = min_nominee.to_inexact()
+ }
+ (Precision::Absent, Precision::Absent) => min_values[index] =
Precision::Absent,
+ }
+}
Review Comment:
It is somewhat worrying that this file has no unit tests 🤔 (not that your
PR made this any better / worse)
##########
datafusion/physical-expr/src/analysis.rs:
##########
@@ -184,24 +192,17 @@ fn shrink_boundaries(
}
});
let graph_nodes = graph.gather_node_indices(&[expr.clone()]);
- let (_, root_index) = graph_nodes.first().ok_or_else(|| {
- DataFusionError::Internal("Error in constructing predicate
graph".to_string())
- })?;
- let final_result = graph.get_interval(*root_index);
+ // Since propagation result was successful, the graph has at least one
element.
+ // An empty check is also done at the outer scope, do not repeat it here.
+ let (_, root_index) = graph_nodes[0];
Review Comment:
I would prefer the internal error rather than this `[0]` which will panic. I
know it is not supposed to happen, but avoiding a panic would be a nicer
experience for users I think
##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -1730,20 +1757,23 @@ mod tests {
),
];
for interval in intervals {
- assert_eq!(interval.cardinality()?, distinct_f32);
+ assert_eq!(interval.cardinality()?.unwrap(), distinct_f32);
}
+ // If the floating numbers has showned a homogeneous distribution
pattern, the result would to be
Review Comment:
I don't understand this comment -- maybe @ozankabak can double check too
##########
datafusion/physical-expr/src/analysis.rs:
##########
@@ -184,24 +192,17 @@ fn shrink_boundaries(
}
});
let graph_nodes = graph.gather_node_indices(&[expr.clone()]);
- let (_, root_index) = graph_nodes.first().ok_or_else(|| {
- DataFusionError::Internal("Error in constructing predicate
graph".to_string())
- })?;
- let final_result = graph.get_interval(*root_index);
+ // Since propagation result was successful, the graph has at least one
element.
+ // An empty check is also done at the outer scope, do not repeat it here.
+ let (_, root_index) = graph_nodes[0];
+ let final_result = graph.get_interval(root_index);
- // If during selectivity calculation we encounter an error, use 1.0 as
cardinality estimate
- // safest estimate(e.q largest possible value).
let selectivity = calculate_selectivity(
&final_result.lower.value,
&final_result.upper.value,
&target_boundaries,
&initial_boundaries,
- )
- .unwrap_or(1.0);
-
- if !(0.0..=1.0).contains(&selectivity) {
Review Comment:
why remove this check?
##########
datafusion/physical-expr/src/analysis.rs:
##########
@@ -64,11 +65,32 @@ impl AnalysisContext {
) -> Self {
let mut column_boundaries = vec![];
for (idx, stats) in statistics.iter().enumerate() {
- column_boundaries.push(ExprBoundaries::from_column(
- stats,
- input_schema.fields()[idx].name().clone(),
- idx,
- ));
+ let field: &Arc<Field> = &input_schema.fields()[idx];
+ if let Ok(inf_field) = ScalarValue::try_from(field.data_type()) {
Review Comment:
I think it would also be ok here to return an internal error if we can't
create a `ScalarValue` (not much else in DataFusion is going to work). And it
would make clear we aren't silently ignoring errors.
##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -165,11 +225,75 @@ pub(crate) fn get_col_stats(
None => None,
};
ColumnStatistics {
- null_count: Some(null_counts[i]),
- max_value,
- min_value,
- distinct_count: None,
+ null_count: null_counts[i].clone(),
+ max_value:
max_value.map(Precision::Exact).unwrap_or(Precision::Absent),
+ min_value:
min_value.map(Precision::Exact).unwrap_or(Precision::Absent),
+ distinct_count: Precision::Absent,
}
})
.collect()
}
+
+/// If the given value is numerically greater than the original maximum value,
+/// set the new maximum value with appropriate exactness information.
+fn set_max_if_greater(
+ max_values: &mut [Precision<ScalarValue>],
+ max_nominee: Precision<ScalarValue>,
+ index: usize,
+) {
+ match (&max_values[index], &max_nominee) {
+ (Precision::Exact(val1), Precision::Exact(val2)) => {
+ if val1 < val2 {
+ max_values[index] = max_nominee;
+ }
+ }
+ (Precision::Exact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Exact(val2)) => {
+ if val1 < val2 {
+ max_values[index] = max_nominee.to_inexact()
+ }
+ }
+ (Precision::Inexact(_), Precision::Absent)
+ | (Precision::Exact(_), Precision::Absent) => {
+ max_values[index] = max_values[index].clone().to_inexact()
+ }
+ (Precision::Absent, Precision::Exact(_))
+ | (Precision::Absent, Precision::Inexact(_)) => {
+ max_values[index] = max_nominee.to_inexact()
+ }
+ (Precision::Absent, Precision::Absent) => max_values[index] =
Precision::Absent,
+ }
+}
+
+/// If the given value is numerically lesser than the original minimum value,
+/// set the new minimum value with appropriate exactness information.
+fn set_min_if_lesser(
+ min_values: &mut [Precision<ScalarValue>],
+ min_nominee: Precision<ScalarValue>,
+ index: usize,
+) {
+ match (&min_values[index], &min_nominee) {
+ (Precision::Exact(val1), Precision::Exact(val2)) => {
+ if val1 > val2 {
+ min_values[index] = min_nominee;
+ }
+ }
+ (Precision::Exact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Inexact(val2))
+ | (Precision::Inexact(val1), Precision::Exact(val2)) => {
+ if val1 > val2 {
+ min_values[index] = min_nominee.to_inexact()
+ }
+ }
+ (Precision::Inexact(_), Precision::Absent)
+ | (Precision::Exact(_), Precision::Absent) => {
+ min_values[index] = min_values[index].clone().to_inexact()
+ }
+ (Precision::Absent, Precision::Exact(_))
+ | (Precision::Absent, Precision::Inexact(_)) => {
+ min_values[index] = min_nominee.to_inexact()
+ }
+ (Precision::Absent, Precision::Absent) => min_values[index] =
Precision::Absent,
+ }
+}
Review Comment:
It is somewhat worrying that this file has no unit tests 🤔 (not that your
PR made this any better / worse)
##########
datafusion/physical-expr/src/intervals/interval_aritmetic.rs:
##########
@@ -692,11 +709,22 @@ fn next_value<const INC: bool>(value: ScalarValue) ->
ScalarValue {
}
/// This function computes the cardinality ratio of the given intervals.
+/// If it cannot be calculated, it returns 1.0 meaning full selective.
Review Comment:
```suggestion
/// This function computes the selectivity by computing the cardinality
ratio of the given intervals.
/// If this can not be calculated for some reasons, returns `1.0` meaning
full selective / no filtering.
```
##########
datafusion/sqllogictest/test_files/union.slt:
##########
@@ -540,8 +540,8 @@ UnionExec
----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)], aggr=[]
------CoalesceBatchesExec: target_batch_size=8192
--------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=4
-----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[]
-------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
Review Comment:
Why did these plans change? Given the inputs have a single input row, I
don't think it actually matters if the `AggregateExec` is before or after the
`RepartitionExec`
##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -33,101 +36,143 @@ pub async fn get_statistics_with_limit(
limit: Option<usize>,
) -> Result<(Vec<PartitionedFile>, Statistics)> {
let mut result_files = vec![];
-
- let mut null_counts = vec![0; file_schema.fields().len()];
- let mut has_statistics = false;
- let (mut max_values, mut min_values) = create_max_min_accs(&file_schema);
-
- let mut is_exact = true;
+ let mut null_counts: Option<Vec<Precision<usize>>> = None;
+ let mut max_values: Option<Vec<Precision<ScalarValue>>> = None;
+ let mut min_values: Option<Vec<Precision<ScalarValue>>> = None;
// The number of rows and the total byte size can be calculated as long as
// at least one file has them. If none of the files provide them, then they
// will be omitted from the statistics. The missing values will be counted
// as zero.
- let mut num_rows = None;
- let mut total_byte_size = None;
+ let mut num_rows: Option<Precision<usize>> = None;
Review Comment:
The same can be applied to `total_byte_size`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]