This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 93fb7157b4 Extract drive-by fixes from PR 12135 for easier reviewing
(#12240)
93fb7157b4 is described below
commit 93fb7157b454f00319d45ce01e1ba2bdebb52b48
Author: June <[email protected]>
AuthorDate: Mon Sep 2 06:10:17 2024 -0600
Extract drive-by fixes from PR 12135 for easier reviewing (#12240)
* Extract drive-by fixes from PR 12135 for easier reviewing
* Add a few more cfgs to silence warnings with different feature sets
* fmt
---
datafusion/common/src/hash_utils.rs | 2 +
datafusion/core/src/datasource/listing/helpers.rs | 16 ++--
datafusion/core/src/datasource/listing/table.rs | 7 +-
.../src/datasource/physical_plan/parquet/mod.rs | 13 +--
.../datasource/physical_plan/parquet/row_filter.rs | 95 ++++++++--------------
datafusion/core/src/datasource/statistics.rs | 26 ++++--
.../core/src/execution/session_state_defaults.rs | 3 +
datafusion/core/src/physical_optimizer/pruning.rs | 4 +
datafusion/physical-expr/src/expressions/binary.rs | 20 +++--
datafusion/physical-plan/src/execution_plan.rs | 2 +-
10 files changed, 93 insertions(+), 95 deletions(-)
diff --git a/datafusion/common/src/hash_utils.rs
b/datafusion/common/src/hash_utils.rs
index f3d2a0a4f9..72cfeafd0b 100644
--- a/datafusion/common/src/hash_utils.rs
+++ b/datafusion/common/src/hash_utils.rs
@@ -245,6 +245,8 @@ fn hash_struct_array(
Ok(())
}
+// only adding this `cfg` b/c this function is only used with this `cfg`
+#[cfg(not(feature = "force_hash_collisions"))]
fn hash_map_array(
array: &MapArray,
random_state: &RandomState,
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index dbeaf5dfcc..33a16237e1 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -51,12 +51,12 @@ use object_store::{ObjectMeta, ObjectStore};
/// - the table provider can filter the table partition values with this
expression
/// - the expression can be marked as `TableProviderFilterPushDown::Exact`
once this filtering
/// was performed
-pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
+pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
let mut is_applicable = true;
expr.apply(|expr| {
match expr {
Expr::Column(Column { ref name, .. }) => {
- is_applicable &= col_names.contains(name);
+ is_applicable &= col_names.contains(&name.as_str());
if is_applicable {
Ok(TreeNodeRecursion::Jump)
} else {
@@ -745,27 +745,27 @@ mod tests {
#[test]
fn test_expr_applicable_for_cols() {
assert!(expr_applicable_for_cols(
- &[String::from("c1")],
+ &["c1"],
&Expr::eq(col("c1"), lit("value"))
));
assert!(!expr_applicable_for_cols(
- &[String::from("c1")],
+ &["c1"],
&Expr::eq(col("c2"), lit("value"))
));
assert!(!expr_applicable_for_cols(
- &[String::from("c1")],
+ &["c1"],
&Expr::eq(col("c1"), col("c2"))
));
assert!(expr_applicable_for_cols(
- &[String::from("c1"), String::from("c2")],
+ &["c1", "c2"],
&Expr::eq(col("c1"), col("c2"))
));
assert!(expr_applicable_for_cols(
- &[String::from("c1"), String::from("c2")],
+ &["c1", "c2"],
&(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
));
assert!(expr_applicable_for_cols(
- &[String::from("c1"), String::from("c2")],
+ &["c1", "c2"],
&(case(col("c1"))
.when(lit("v1"), lit(true))
.otherwise(lit(false))
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 35286612a8..9246226d43 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -826,7 +826,7 @@ impl TableProvider for ListingTable {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
- let support: Vec<_> = filters
+ Ok(filters
.iter()
.map(|filter| {
if expr_applicable_for_cols(
@@ -834,7 +834,7 @@ impl TableProvider for ListingTable {
.options
.table_partition_cols
.iter()
- .map(|x| x.0.clone())
+ .map(|x| x.0.as_str())
.collect::<Vec<_>>(),
filter,
) {
@@ -846,8 +846,7 @@ impl TableProvider for ListingTable {
TableProviderFilterPushDown::Inexact
}
})
- .collect();
- Ok(support)
+ .collect())
}
fn get_table_definition(&self) -> Option<&str> {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 85d6f8db23..b2f86db742 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -685,10 +685,12 @@ impl ExecutionPlan for ParquetExec {
partition_index: usize,
ctx: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let projection = match
self.base_config.file_column_projection_indices() {
- Some(proj) => proj,
- None => (0..self.base_config.file_schema.fields().len()).collect(),
- };
+ let projection = self
+ .base_config
+ .file_column_projection_indices()
+ .unwrap_or_else(|| {
+ (0..self.base_config.file_schema.fields().len()).collect()
+ });
let parquet_file_reader_factory = self
.parquet_file_reader_factory
@@ -698,8 +700,7 @@ impl ExecutionPlan for ParquetExec {
ctx.runtime_env()
.object_store(&self.base_config.object_store_url)
.map(|store| {
- Arc::new(DefaultParquetFileReaderFactory::new(store))
- as Arc<dyn ParquetFileReaderFactory>
+ Arc::new(DefaultParquetFileReaderFactory::new(store))
as _
})
})?;
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
index 23fdadc2cd..59d23fd68c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
@@ -59,6 +59,7 @@
//! the unsorted predicates. Within each partition, predicates are
//! still be sorted by size.
+use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::sync::Arc;
@@ -129,7 +130,7 @@ impl DatafusionArrowPredicate {
// on the order they appear in the file
let projection = match candidate.projection.len() {
0 | 1 => vec![],
- _ => remap_projection(&candidate.projection),
+ 2.. => remap_projection(&candidate.projection),
};
Ok(Self {
@@ -151,32 +152,31 @@ impl ArrowPredicate for DatafusionArrowPredicate {
&self.projection_mask
}
- fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
- let batch = match self.projection.is_empty() {
- true => batch,
- false => batch.project(&self.projection)?,
+ fn evaluate(&mut self, mut batch: RecordBatch) ->
ArrowResult<BooleanArray> {
+ if !self.projection.is_empty() {
+ batch = batch.project(&self.projection)?;
};
let batch = self.schema_mapping.map_partial_batch(batch)?;
// scoped timer updates on drop
let mut timer = self.time.timer();
- match self
- .physical_expr
+
+ self.physical_expr
.evaluate(&batch)
.and_then(|v| v.into_array(batch.num_rows()))
- {
- Ok(array) => {
+ .and_then(|array| {
let bool_arr = as_boolean_array(&array)?.clone();
let num_filtered = bool_arr.len() - bool_arr.true_count();
self.rows_filtered.add(num_filtered);
timer.stop();
Ok(bool_arr)
- }
- Err(e) => Err(ArrowError::ComputeError(format!(
- "Error evaluating filter predicate: {e:?}"
- ))),
- }
+ })
+ .map_err(|e| {
+ ArrowError::ComputeError(format!(
+ "Error evaluating filter predicate: {e:?}"
+ ))
+ })
}
}
@@ -453,62 +453,33 @@ pub fn build_row_filter(
// no candidates
if candidates.is_empty() {
- Ok(None)
- } else if reorder_predicates {
- // attempt to reorder the predicates by size and whether they are
sorted
- candidates.sort_by_key(|c| c.required_bytes);
-
- let (indexed_candidates, other_candidates): (Vec<_>, Vec<_>) =
- candidates.into_iter().partition(|c| c.can_use_index);
-
- let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
-
- for candidate in indexed_candidates {
- let filter = DatafusionArrowPredicate::try_new(
- candidate,
- file_schema,
- metadata,
- rows_filtered.clone(),
- time.clone(),
- Arc::clone(&schema_mapping),
- )?;
-
- filters.push(Box::new(filter));
- }
-
- for candidate in other_candidates {
- let filter = DatafusionArrowPredicate::try_new(
- candidate,
- file_schema,
- metadata,
- rows_filtered.clone(),
- time.clone(),
- Arc::clone(&schema_mapping),
- )?;
+ return Ok(None);
+ }
- filters.push(Box::new(filter));
- }
+ if reorder_predicates {
+ candidates.sort_unstable_by(|c1, c2| {
+ match c1.can_use_index.cmp(&c2.can_use_index) {
+ Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
+ ord => ord,
+ }
+ });
+ }
- Ok(Some(RowFilter::new(filters)))
- } else {
- // otherwise evaluate the predicates in the order the appeared in the
- // original expressions
- let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
- for candidate in candidates {
- let filter = DatafusionArrowPredicate::try_new(
+ candidates
+ .into_iter()
+ .map(|candidate| {
+ DatafusionArrowPredicate::try_new(
candidate,
file_schema,
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
- )?;
-
- filters.push(Box::new(filter));
- }
-
- Ok(Some(RowFilter::new(filters)))
- }
+ )
+ .map(|pred| Box::new(pred) as _)
+ })
+ .collect::<Result<Vec<_>, _>>()
+ .map(|filters| Some(RowFilter::new(filters)))
}
#[cfg(test)]
diff --git a/datafusion/core/src/datasource/statistics.rs
b/datafusion/core/src/datasource/statistics.rs
index 6f89657def..201bbfd5c0 100644
--- a/datafusion/core/src/datasource/statistics.rs
+++ b/datafusion/core/src/datasource/statistics.rs
@@ -18,16 +18,21 @@
use std::mem;
use std::sync::Arc;
-use arrow_schema::DataType;
use futures::{Stream, StreamExt};
use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;
-use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
-use crate::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
+use crate::physical_plan::{ColumnStatistics, Statistics};
+
+#[cfg(feature = "parquet")]
+use crate::{
+ arrow::datatypes::Schema,
+ functions_aggregate::min_max::{MaxAccumulator, MinAccumulator},
+ physical_plan::Accumulator,
+};
use super::listing::PartitionedFile;
@@ -144,6 +149,8 @@ pub async fn get_statistics_with_limit(
Ok((result_files, statistics))
}
+// only adding this cfg b/c this is the only feature it's used with currently
+#[cfg(feature = "parquet")]
pub(crate) fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
@@ -175,6 +182,8 @@ fn add_row_stats(
}
}
+// only adding this cfg b/c this is the only feature it's used with currently
+#[cfg(feature = "parquet")]
pub(crate) fn get_col_stats(
schema: &Schema,
null_counts: Vec<Precision<usize>>,
@@ -205,8 +214,13 @@ pub(crate) fn get_col_stats(
// (aka non Dictionary) output. We need to adjust the output data type to
reflect this.
// The reason min/max aggregate produces unpacked output because there is only
one
// min/max value per group; there is no needs to keep them Dictionary encode
-fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
- if let DataType::Dictionary(_, value_type) = input_type {
+//
+// only adding this cfg b/c this is the only feature it's used with currently
+#[cfg(feature = "parquet")]
+fn min_max_aggregate_data_type(
+ input_type: &arrow_schema::DataType,
+) -> &arrow_schema::DataType {
+ if let arrow_schema::DataType::Dictionary(_, value_type) = input_type {
value_type.as_ref()
} else {
input_type
diff --git a/datafusion/core/src/execution/session_state_defaults.rs
b/datafusion/core/src/execution/session_state_defaults.rs
index bc7e194cae..b5370efa0a 100644
--- a/datafusion/core/src/execution/session_state_defaults.rs
+++ b/datafusion/core/src/execution/session_state_defaults.rs
@@ -100,7 +100,9 @@ impl SessionStateDefaults {
/// returns the list of default [`ScalarUDF']'s
pub fn default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
+ #[cfg_attr(not(feature = "nested_expressions"), allow(unused_mut))]
let mut functions: Vec<Arc<ScalarUDF>> =
functions::all_default_functions();
+
#[cfg(feature = "nested_expressions")]
functions.append(&mut
functions_nested::all_default_nested_functions());
@@ -144,6 +146,7 @@ impl SessionStateDefaults {
}
/// registers all the builtin array functions
+ #[cfg_attr(not(feature = "nested_expressions"), allow(unused_variables))]
pub fn register_array_functions(state: &mut SessionState) {
// register crate of array expressions (if enabled)
#[cfg(feature = "nested_expressions")]
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs
b/datafusion/core/src/physical_optimizer/pruning.rs
index a16abc607e..9bc2bb1d1d 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -615,6 +615,8 @@ impl PruningPredicate {
is_always_true(&self.predicate_expr) &&
self.literal_guarantees.is_empty()
}
+ // this is only used by `parquet` feature right now
+ #[allow(dead_code)]
pub(crate) fn required_columns(&self) -> &RequiredColumns {
&self.required_columns
}
@@ -746,6 +748,8 @@ impl RequiredColumns {
/// * `a > 5 OR a < 10` returns `Some(a)`
/// * `a > 5 OR b < 10` returns `None`
/// * `true` returns None
+ #[allow(dead_code)]
+ // this fn is only used by `parquet` feature right now, thus the
`allow(dead_code)`
pub(crate) fn single_column(&self) -> Option<&phys_expr::Column> {
if self.columns.windows(2).all(|w| {
// check if all columns are the same (ignoring statistics and
field)
diff --git a/datafusion/physical-expr/src/expressions/binary.rs
b/datafusion/physical-expr/src/expressions/binary.rs
index 2680a7930f..08c133d719 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -33,6 +33,7 @@ use arrow::compute::kernels::comparison::{
use arrow::compute::kernels::concat_elements::concat_elements_utf8;
use arrow::compute::{cast, ilike, like, nilike, nlike};
use arrow::datatypes::*;
+use arrow_schema::ArrowError;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_expr::interval_arithmetic::{apply_operator, Interval};
@@ -133,12 +134,15 @@ impl std::fmt::Display for BinaryExpr {
}
/// Invoke a boolean kernel on a pair of arrays
-macro_rules! boolean_op {
- ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
- let ll = as_boolean_array($LEFT).expect("boolean_op failed to downcast
array");
- let rr = as_boolean_array($RIGHT).expect("boolean_op failed to
downcast array");
- Ok(Arc::new($OP(&ll, &rr)?))
- }};
+#[inline]
+fn boolean_op(
+ left: &dyn Array,
+ right: &dyn Array,
+ op: impl FnOnce(&BooleanArray, &BooleanArray) -> Result<BooleanArray,
ArrowError>,
+) -> Result<Arc<(dyn Array + 'static)>, ArrowError> {
+ let ll = as_boolean_array(left).expect("boolean_op failed to downcast left
array");
+ let rr = as_boolean_array(right).expect("boolean_op failed to downcast
right array");
+ op(ll, rr).map(|t| Arc::new(t) as _)
}
macro_rules! binary_string_array_flag_op {
@@ -596,7 +600,7 @@ impl BinaryExpr {
| NotLikeMatch | NotILikeMatch => unreachable!(),
And => {
if left_data_type == &DataType::Boolean {
- boolean_op!(&left, &right, and_kleene)
+ Ok(boolean_op(&left, &right, and_kleene)?)
} else {
internal_err!(
"Cannot evaluate binary expression {:?} with types
{:?} and {:?}",
@@ -608,7 +612,7 @@ impl BinaryExpr {
}
Or => {
if left_data_type == &DataType::Boolean {
- boolean_op!(&left, &right, or_kleene)
+ Ok(boolean_op(&left, &right, or_kleene)?)
} else {
internal_err!(
"Cannot evaluate binary expression {:?} with types
{:?} and {:?}",
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index 53ae59f707..f584542faf 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -718,7 +718,7 @@ pub fn execute_stream(
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => plan.execute(0, context),
- _ => {
+ 2.. => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
// CoalescePartitionsExec must produce a single partition
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]